You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/10/19 00:34:04 UTC

[1/3] aurora git commit: Remove legacy commons ZK code

Repository: aurora
Updated Branches:
  refs/heads/master c6388774b -> 15cb049f3


http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 1e2e01d..8f4f63c 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -159,7 +159,6 @@ public class CommandLineTest {
     expected.scheduling.schedulingMaxBatchSize = 42;
     expected.scheduling.maxTasksPerScheduleAttempt = 42;
     expected.async.asyncWorkerThreads = 42;
-    expected.zk.useCurator = false;
     expected.zk.inProcess = true;
     expected.zk.zkEndpoints = ImmutableList.of(InetSocketAddress.createUnresolved("testing", 42));
     expected.zk.chrootPath = "testing";
@@ -311,7 +310,6 @@ public class CommandLineTest {
         "-scheduling_max_batch_size=42",
         "-max_tasks_per_schedule_attempt=42",
         "-async_worker_threads=42",
-        "-zk_use_curator=false",
         "-zk_in_proc=true",
         "-zk_endpoints=testing:42",
         "-zk_chroot_path=testing",

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
deleted file mode 100644
index cec54e5..0000000
--- a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.net.InetSocketAddress;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Module;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.common.testing.TearDownTestCase;
-import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.SingletonService;
-import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-
-abstract class AbstractDiscoveryModuleTest extends TearDownTestCase {
-
-  @Test
-  public void testBindingContract() {
-    ZooKeeperConfig zooKeeperConfig =
-        new ZooKeeperConfig(
-            isCurator(),
-            ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
-            Optional.of("/chroot"),
-            false, // inProcess
-            Amount.of(1, Time.DAYS),
-            Amount.of(1, Time.DAYS),
-            Optional.of(Credentials.digestCredentials("test", "user")));
-
-    Injector injector =
-        Guice.createInjector(
-            new AbstractModule() {
-              @Override
-              protected void configure() {
-                bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
-                    .toInstance(
-                        ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
-                bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
-                    .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
-
-                bind(StatsProvider.class).toInstance(new FakeStatsProvider());
-
-                bindExtraRequirements(binder());
-              }
-            },
-            createModule("/discovery/path", zooKeeperConfig));
-
-    assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
-    assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
-  }
-
-  void bindExtraRequirements(Binder binder) {
-    // Noop.
-  }
-
-  abstract Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig);
-
-  abstract boolean isCurator();
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
index 226b068..02c8183 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
@@ -25,7 +25,7 @@ import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.Encoding;
 import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
 import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.framework.CuratorFramework;
@@ -38,7 +38,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
 
   static final String GROUP_PATH = "/group/root";
   static final String MEMBER_TOKEN = "member_";
-  static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC;
+  static final Codec<ServiceInstance> CODEC = Encoding.JSON_CODEC;
   static final int PRIMARY_PORT = 42;
 
   private CuratorFramework client;
@@ -55,7 +55,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
     groupCache.getListenable().addListener((c, event) -> groupEvents.put(event));
 
     Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
-    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC);
+    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, Encoding.JSON_CODEC);
   }
 
   final CuratorFramework startNewClient() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
deleted file mode 100644
index 7a4c4dd..0000000
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import com.google.inject.Module;
-
-public class CommonsDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
-
-  @Override
-  Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
-    return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
-  }
-
-  @Override
-  boolean isCurator() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
deleted file mode 100644
index 42a2224..0000000
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-import org.easymock.Capture;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class CommonsServiceGroupMonitorTest extends EasyMockTest {
-
-  private DynamicHostSet<ServiceInstance> serverSet;
-  private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture;
-  private Command stopCommand;
-
-  @Before
-  public void setUp() throws Exception {
-    serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { });
-    hostChangeMonitorCapture = createCapture();
-    stopCommand = createMock(Command.class);
-  }
-
-  private void expectSuccessfulWatch() throws Exception {
-    expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand);
-  }
-
-  private void expectFailedWatch() throws Exception {
-    DynamicHostSet.MonitorException watchError =
-        new DynamicHostSet.MonitorException(
-            "Problem watching service group",
-            new RuntimeException());
-    expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError);
-  }
-
-  @Test
-  public void testNominalLifecycle() throws Exception {
-    expectSuccessfulWatch();
-
-    stopCommand.execute();
-    expectLastCall();
-
-    control.replay();
-
-    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
-    groupMonitor.start();
-    groupMonitor.close();
-  }
-
-  @Test
-  public void testExceptionalLifecycle() throws Exception {
-    expectFailedWatch();
-    control.replay();
-
-    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
-    try {
-      groupMonitor.start();
-      fail();
-    } catch (ServiceGroupMonitor.MonitorException e) {
-      // expected
-    }
-
-    // Close on a non-started monitor should be allowed.
-    groupMonitor.close();
-  }
-
-  @Test
-  public void testNoHosts() throws Exception {
-    expectSuccessfulWatch();
-    control.replay();
-
-    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
-
-    groupMonitor.start();
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
-
-    hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of());
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
-  }
-
-  @Test
-  public void testHostUpdates() throws Exception {
-    expectSuccessfulWatch();
-    control.replay();
-
-    CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
-    groupMonitor.start();
-
-    ImmutableSet<ServiceInstance> twoHosts =
-        ImmutableSet.of(serviceInstance("one"), serviceInstance("two"));
-    hostChangeMonitorCapture.getValue().onChange(twoHosts);
-    assertEquals(twoHosts, groupMonitor.get());
-
-    ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one"));
-    hostChangeMonitorCapture.getValue().onChange(oneHost);
-    assertEquals(oneHost, groupMonitor.get());
-
-    ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three"));
-    hostChangeMonitorCapture.getValue().onChange(anotherHost);
-    assertEquals(anotherHost, groupMonitor.get());
-
-    ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of();
-    hostChangeMonitorCapture.getValue().onChange(noHosts);
-    assertEquals(noHosts, groupMonitor.get());
-  }
-
-  private ServiceInstance serviceInstance(String hostName) {
-    return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
index f1a02e4..f847882 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
@@ -13,36 +13,67 @@
  */
 package org.apache.aurora.scheduler.discovery;
 
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
-import com.google.inject.Module;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
 
 import org.apache.aurora.common.application.ShutdownRegistry;
 import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.zookeeper.data.ACL;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
-public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+public class CuratorDiscoveryModuleTest extends TearDownTestCase {
 
-  @Override
-  void bindExtraRequirements(Binder binder) {
-    ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
-    binder.bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
-    addTearDown(shutdownRegistry::execute);
-  }
+  @Test
+  public void testBindingContract() {
+    ZooKeeperConfig zooKeeperConfig =
+        new ZooKeeperConfig(
+            ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
+            Optional.of("/chroot"),
+            false, // inProcess
+            Amount.of(1, Time.DAYS),
+            Amount.of(1, Time.DAYS),
+            Optional.of(Credentials.digestCredentials("test", "user")));
 
-  @Override
-  Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
-    return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
-  }
+    Injector injector =
+        Guice.createInjector(
+            new AbstractModule() {
+              @Override
+              protected void configure() {
+                bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
+                    .toInstance(
+                        ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
+                bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
+                    .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
+
+                bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+
+                ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
+                bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+                addTearDown(shutdownRegistry::execute);
+              }
+            },
+            new CuratorServiceDiscoveryModule("/discovery/path", zooKeeperConfig));
 
-  @Override
-  boolean isCurator() {
-    return false;
+    assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
+    assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
index baee123..f1ea9e6 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
@@ -37,7 +37,6 @@ public class ZooKeeperConfigTest {
   @Test(expected = IllegalArgumentException.class)
   public void testEmptyServers() {
     new ZooKeeperConfig(
-        false,
         ImmutableList.of(),
         Optional.absent(),
         false,
@@ -50,7 +49,6 @@ public class ZooKeeperConfigTest {
   public void testWithCredentials() {
     ZooKeeperConfig config =
         new ZooKeeperConfig(
-            false,
             SERVERS,
             Optional.absent(),
             false,
@@ -72,9 +70,8 @@ public class ZooKeeperConfigTest {
 
   @Test
   public void testCreateFactory() {
-    ZooKeeperConfig config = ZooKeeperConfig.create(true, SERVERS);
+    ZooKeeperConfig config = ZooKeeperConfig.create(SERVERS);
 
-    assertTrue(config.isUseCurator());
     assertEquals(SERVERS, ImmutableList.copyOf(config.getServers()));
     assertFalse(config.getChrootPath().isPresent());
     assertFalse(config.isInProcess());

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/http/LeaderHealthTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/LeaderHealthTest.java b/src/test/java/org/apache/aurora/scheduler/http/LeaderHealthTest.java
index d21a38e..d3e4580 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/LeaderHealthTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/LeaderHealthTest.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.http;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response;
 
-import org.apache.aurora.common.net.pool.DynamicHostSet.MonitorException;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.scheduler.http.LeaderRedirect.LeaderStatus;
 import org.junit.Before;
@@ -31,7 +30,7 @@ public class LeaderHealthTest extends EasyMockTest {
   private LeaderHealth leaderHealth;
 
   @Before
-  public void setUp() throws MonitorException {
+  public void setUp() {
     leaderRedirect = createMock(LeaderRedirect.class);
     leaderHealth = new LeaderHealth(leaderRedirect);
   }


[3/3] aurora git commit: Remove legacy commons ZK code

Posted by wf...@apache.org.
Remove legacy commons ZK code

Reviewed at https://reviews.apache.org/r/62652/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/15cb049f
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/15cb049f
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/15cb049f

Branch: refs/heads/master
Commit: 15cb049f3b5d1a3d662e8a396ce7020b107a2fe8
Parents: c638877
Author: Bill Farner <wf...@apache.org>
Authored: Wed Oct 18 17:33:50 2017 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Oct 18 17:33:50 2017 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   4 +-
 .../aurora/common/net/pool/DynamicHostSet.java  |  59 --
 .../aurora/common/zookeeper/Candidate.java      |  78 ---
 .../aurora/common/zookeeper/CandidateImpl.java  | 127 ----
 .../aurora/common/zookeeper/Encoding.java       |  87 +++
 .../apache/aurora/common/zookeeper/Group.java   | 674 -------------------
 .../aurora/common/zookeeper/ServerSet.java      |  74 --
 .../aurora/common/zookeeper/ServerSetImpl.java  | 349 ----------
 .../aurora/common/zookeeper/ServerSets.java     | 118 ----
 .../common/zookeeper/SingletonServiceImpl.java  | 122 ----
 .../common/zookeeper/ZooKeeperClient.java       | 372 ----------
 .../aurora/common/zookeeper/ZooKeeperUtils.java | 106 +--
 .../testing/BaseZooKeeperClientTest.java        | 140 ----
 .../zookeeper/testing/ZooKeeperTestServer.java  |   6 +-
 .../common/zookeeper/CandidateImplTest.java     | 165 -----
 .../aurora/common/zookeeper/EncodingTest.java   |  44 ++
 .../aurora/common/zookeeper/GroupTest.java      | 321 ---------
 .../aurora/common/zookeeper/JsonCodecTest.java  |  18 +-
 .../common/zookeeper/ServerSetImplTest.java     | 258 -------
 .../aurora/common/zookeeper/ServerSetsTest.java |  44 --
 .../zookeeper/SingletonServiceImplTest.java     | 243 -------
 .../common/zookeeper/ZooKeeperClientTest.java   | 210 ------
 .../common/zookeeper/ZooKeeperUtilsTest.java    |  76 +--
 .../CommonsServiceDiscoveryModule.java          | 102 ---
 .../discovery/CommonsServiceGroupMonitor.java   |  59 --
 .../CuratorServiceDiscoveryModule.java          |   4 +-
 .../discovery/FlaggedZooKeeperConfig.java       |   8 -
 .../discovery/ServiceDiscoveryModule.java       |  11 +-
 .../scheduler/discovery/ZooKeeperConfig.java    |  12 +-
 .../aurora/scheduler/app/SchedulerIT.java       |  57 +-
 .../scheduler/config/CommandLineTest.java       |   2 -
 .../discovery/AbstractDiscoveryModuleTest.java  |  82 ---
 .../discovery/BaseCuratorDiscoveryTest.java     |   6 +-
 .../discovery/CommonsDiscoveryModuleTest.java   |  29 -
 .../CommonsServiceGroupMonitorTest.java         | 137 ----
 .../discovery/CuratorDiscoveryModuleTest.java   |  63 +-
 .../discovery/ZooKeeperConfigTest.java          |   5 +-
 .../aurora/scheduler/http/LeaderHealthTest.java |   3 +-
 38 files changed, 237 insertions(+), 4038 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 079f495..f4cc416 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -18,8 +18,10 @@
 - Increase default ZooKeeper session timeout from 4 to 15 seconds.
 - Add option `-zk_connection_timeout` to control the connection timeout of ZooKeeper connections.
 
-### Deprecations and removals
+### Deprecations and removals:
 
+- Removed the deprecated command line argument `-zk_use_curator`, removing the choice to use the
+  legacy ZooKeeper client.
 - Removed the `rewriteConfigs` thrift API call in the scheduler. This was a last-ditch mechanism
   to modify scheduler state on the fly. It was considered extremely risky to use since its
   inception, and is safer to abandon due to its lack of use and likelihood for code rot.

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
deleted file mode 100644
index df469ef..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.Command;
-
-/**
- * A host set that can be monitored for changes.
- *
- * @param <T> The type that is used to identify members of the host set.
- */
-public interface DynamicHostSet<T> {
-
-  /**
-   * Registers a monitor to receive change notices for this server set as long as this jvm process
-   * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
-   * The monitor will be notified if the membership set or parameters of existing members have
-   * changed.
-   *
-   * @param monitor the server set monitor to call back when the host set changes
-   * @return A command which, when executed, will stop monitoring the host set.
-   * @throws MonitorException if there is a problem monitoring the host set
-   */
-  Command watch(HostChangeMonitor<T> monitor) throws MonitorException;
-
-  /**
-   * An interface to an object that is interested in receiving notification whenever the host set
-   * changes.
-   */
-  interface HostChangeMonitor<T> {
-
-    /**
-     * Called when either the available set of services changes (when a service dies or a new
-     * instance comes on-line) or when an existing service advertises a status or health change.
-     *
-     * @param hostSet the current set of available ServiceInstances
-     */
-    void onChange(ImmutableSet<T> hostSet);
-  }
-
-  class MonitorException extends Exception {
-    public MonitorException(String msg, Throwable cause) {
-      super(msg, cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
deleted file mode 100644
index 75c1b14..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Interface definition for becoming or querying for a ZooKeeper-based group leader.
- */
-public interface Candidate {
-
-  /**
-   * Returns the current group leader by querying ZooKeeper synchronously.
-   *
-   * @return the current group leader's identifying data or {@link Optional#absent()} if there is
-   *     no leader
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading the leader information
-   * @throws InterruptedException if this thread is interrupted getting the leader
-   */
-  public Optional<byte[]> getLeaderData()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException;
-
-  /**
-   * Encapsulates a leader that can be elected and subsequently defeated.
-   */
-  interface Leader {
-
-    /**
-     * Called when this leader has been elected.
-     *
-     * @param abdicate a command that can be used to abdicate leadership and force a new election
-     */
-    void onElected(ExceptionalCommand<JoinException> abdicate);
-
-    /**
-     * Called when the leader has been ousted.  Can occur either if the leader abdicates or if an
-     * external event causes the leader to lose its leadership role (session expiration).
-     */
-    void onDefeated();
-  }
-
-  /**
-   * Offers this candidate in leadership elections for as long as the current jvm process is alive.
-   * Upon election, the {@code onElected} callback will be executed and a command that can be used
-   * to abdicate leadership will be passed in.  If the elected leader jvm process dies or the
-   * elected leader successfully abdicates then a new leader will be elected.  Leaders that
-   * successfully abdicate are removed from the group and will not be eligible for leadership
-   * election unless {@link #offerLeadership(Leader)} is called again.
-   *
-   * @param leader the leader to notify of election and defeat events
-   * @throws JoinException if there was a problem joining the group
-   * @throws WatchException if there is a problem generating the 1st group membership list
-   * @throws InterruptedException if interrupted waiting to join the group and determine initial
-   *     election results
-   * @return a supplier that can be queried to find out if this leader is currently elected
-   */
-  public Supplier<Boolean> offerLeadership(Leader leader)
-        throws JoinException, WatchException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
deleted file mode 100644
index 98b5ee4..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements leader election for small groups of candidates.  This implementation is subject to the
- * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
- * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
- */
-public class CandidateImpl implements Candidate {
-  private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class);
-
-  private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
-
-  private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> {
-    try {
-      return InetAddress.getLocalHost().getHostAddress().getBytes();
-    } catch (UnknownHostException e) {
-      LOG.warn("Failed to determine local address!", e);
-      return UNKNOWN_CANDIDATE_DATA;
-    }
-  };
-
-  private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
-      candidates -> Ordering.natural().min(candidates);
-
-  private final Group group;
-
-  /**
-   * Creates a candidate that can be used to offer leadership for the given {@code group}.
-   */
-  public CandidateImpl(Group group) {
-    this.group = Preconditions.checkNotNull(group);
-  }
-
-  @Override
-  public Optional<byte[]> getLeaderData()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-
-    String leaderId = getLeader(group.getMemberIds());
-    return leaderId == null
-        ? Optional.<byte[]>absent()
-        : Optional.of(group.getMemberData(leaderId));
-  }
-
-  @Override
-  public Supplier<Boolean> offerLeadership(final Leader leader)
-      throws JoinException, WatchException, InterruptedException {
-
-    final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated);
-
-    final AtomicBoolean elected = new AtomicBoolean(false);
-    final AtomicBoolean abdicated = new AtomicBoolean(false);
-    group.watch(memberIds -> {
-      boolean noCandidates = Iterables.isEmpty(memberIds);
-      String memberId = membership.getMemberId();
-
-      if (noCandidates) {
-        LOG.warn("All candidates have temporarily left the group: " + group);
-      } else if (!Iterables.contains(memberIds, memberId)) {
-        LOG.error(
-            "Current member ID {} is not a candidate for leader, current voting: {}",
-            memberId, memberIds);
-      } else {
-        boolean electedLeader = memberId.equals(getLeader(memberIds));
-        boolean previouslyElected = elected.getAndSet(electedLeader);
-
-        if (!previouslyElected && electedLeader) {
-          LOG.info("Candidate {} is now leader of group: {}",
-              membership.getMemberPath(), memberIds);
-
-          leader.onElected(() -> {
-            membership.cancel();
-            abdicated.set(true);
-          });
-        } else if (!electedLeader) {
-          if (previouslyElected) {
-            leader.onDefeated();
-          }
-          LOG.info(
-              "Candidate {} waiting for the next leader election, current voting: {}",
-              membership.getMemberPath(), memberIds);
-        }
-      }
-    });
-
-    return () -> !abdicated.get() && elected.get();
-  }
-
-  @Nullable
-  private String getLeader(Iterable<String> memberIds) {
-    return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
new file mode 100644
index 0000000..204f5c4
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+/**
+ * Utility class for encoding and decoding data stored in ZooKeeper nodes.
+ */
+public class Encoding {
+  /**
+   * Encodes a {@link ServiceInstance} as a JSON object.
+   *
+   * This is the default encoding for service instance data in ZooKeeper.
+   */
+  public static final Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
+
+  private Encoding() {
+    // Utility class.
+  }
+
+  /**
+   * Returns a serialized Thrift service instance object, with given endpoints and codec.
+   *
+   * @param serviceInstance the Thrift service instance object to be serialized
+   * @param codec the codec to use to serialize a Thrift service instance object
+   * @return byte array that contains a serialized Thrift service instance
+   */
+  static byte[] serializeServiceInstance(
+      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    codec.serialize(serviceInstance, output);
+    return output.toByteArray();
+  }
+
+  /**
+   * Serializes a service instance based on endpoints.
+   * @see #serializeServiceInstance(ServiceInstance, Codec)
+   *
+   * @param address the target address of the service instance
+   * @param additionalEndpoints additional endpoints of the service instance
+   * @param status service status
+   */
+  static byte[] serializeServiceInstance(
+      InetSocketAddress address,
+      Map<String, Endpoint> additionalEndpoints,
+      Status status,
+      Codec<ServiceInstance> codec) throws IOException {
+
+    ServiceInstance serviceInstance = new ServiceInstance(
+        new Endpoint(address.getHostName(), address.getPort()), additionalEndpoints, status);
+    return serializeServiceInstance(serviceInstance, codec);
+  }
+
+  /**
+   * Creates a service instance object deserialized from byte array.
+   *
+   * @param data the byte array contains a serialized Thrift service instance
+   * @param codec the codec to use to deserialize the byte array
+   */
+  static ServiceInstance deserializeServiceInstance(
+      byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+    return codec.deserialize(new ByteArrayInputStream(data));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
deleted file mode 100644
index 2720dd1..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
+++ /dev/null
@@ -1,674 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.base.Commands;
-import org.apache.aurora.common.base.ExceptionalSupplier;
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class exposes methods for joining and monitoring distributed groups.  The groups this class
- * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
- * each member of a group.
- */
-public class Group {
-  private static final Logger LOG = LoggerFactory.getLogger(Group.class);
-
-  private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
-  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
-
-  private final ZooKeeperClient zkClient;
-  private final ImmutableList<ACL> acl;
-  private final String path;
-
-  private final NodeScheme nodeScheme;
-  private final Predicate<String> nodeNameFilter;
-
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a group rooted at the given {@code path}.  Paths must be absolute and trailing or
-   * duplicate slashes will be normalized.  For example, all the following paths would create a
-   * group at the normalized path /my/distributed/group:
-   * <ul>
-   *   <li>/my/distributed/group
-   *   <li>/my/distributed/group/
-   *   <li>/my/distributed//group
-   * </ul>
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it does not already exist
-   * @param path the absolute persistent path that represents this group
-   * @param nodeScheme the scheme that defines how nodes are created
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
-    this.zkClient = Preconditions.checkNotNull(zkClient);
-    this.acl = ImmutableList.copyOf(acl);
-    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
-
-    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
-    nodeNameFilter = Group.this.nodeScheme::isMember;
-
-    backoffHelper = new BackoffHelper();
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
-   * {@code namePrefix} of 'member_'.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
-   * {@link DefaultScheme} using {@code namePrefix}.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
-    this(zkClient, acl, path, new DefaultScheme(namePrefix));
-  }
-
-  public String getMemberPath(String memberId) {
-    return path + "/" + MorePreconditions.checkNotBlank(memberId);
-  }
-
-  public String getPath() {
-    return path;
-  }
-
-  public String getMemberId(String nodePath) {
-    MorePreconditions.checkNotBlank(nodePath);
-    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
-        "Not a member of this group[%s]: %s", path, nodePath);
-
-    String memberId = StringUtils.substringAfterLast(nodePath, "/");
-    Preconditions.checkArgument(nodeScheme.isMember(memberId),
-        "Not a group member: %s", memberId);
-    return memberId;
-  }
-
-  /**
-   * Returns the current list of group member ids by querying ZooKeeper synchronously.
-   *
-   * @return the ids of all the present members of this group
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading this group's member ids
-   * @throws InterruptedException if this thread is interrupted listing the group members
-   */
-  public Iterable<String> getMemberIds()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-    return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
-  }
-
-  /**
-   * Gets the data for one of this groups members by querying ZooKeeper synchronously.
-   *
-   * @param memberId the id of the member whose data to retrieve
-   * @return the data associated with the {@code memberId}
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading this member's data
-   * @throws InterruptedException if this thread is interrupted retrieving the member data
-   */
-  public byte[] getMemberData(String memberId)
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-    return zkClient.get().getData(getMemberPath(memberId), false, null);
-  }
-
-  /**
-   * Represents membership in a distributed group.
-   */
-  public interface Membership {
-
-    /**
-     * Returns the persistent ZooKeeper path that represents this group.
-     */
-    String getGroupPath();
-
-    /**
-     * Returns the id (ZooKeeper node name) of this group member.  May change over time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberId();
-
-    /**
-     * Returns the full ZooKeeper path to this group member.  May change over time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberPath();
-
-    /**
-     * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
-     * {@link Group#join()}.
-     *
-     * @return the new membership data
-     * @throws UpdateException if there was a problem updating the membership data
-     */
-    byte[] updateMemberData() throws UpdateException;
-
-    /**
-     * Cancels group membership by deleting the associated ZooKeeper member node.
-     *
-     * @throws JoinException if there is a problem deleting the node
-     */
-    void cancel() throws JoinException;
-  }
-
-  /**
-   * Indicates an error joining a group.
-   */
-  public static class JoinException extends Exception {
-    public JoinException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error updating a group member's data.
-   */
-  public static class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, null)}.
-   */
-  public final Membership join() throws JoinException, InterruptedException {
-    return join(NO_MEMBER_DATA, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(memberData, null)}.
-   */
-  public final Membership join(Supplier<byte[]> memberData)
-      throws JoinException, InterruptedException {
-
-    return join(memberData, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, onLoseMembership)}.
-   */
-  public final Membership join(@Nullable final Command onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    return join(NO_MEMBER_DATA, onLoseMembership);
-  }
-
-  /**
-   * Joins this group and returns the resulting Membership when successful.  Membership will be
-   * automatically cancelled when the current jvm process dies; however the returned Membership
-   * object can be used to cancel membership earlier.  Unless
-   * {@link Group.Membership#cancel()} is called the membership will
-   * be maintained by re-establishing it silently in the background.
-   *
-   * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper.  If an
-   * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
-   * membership in the group.
-   *
-   * @param memberData a supplier of the data to store in the member node
-   * @param onLoseMembership a callback to notify when membership is lost
-   * @return a Membership object with the member details
-   * @throws JoinException if there was a problem joining the group
-   * @throws InterruptedException if this thread is interrupted awaiting completion of the join
-   */
-  public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    Preconditions.checkNotNull(memberData);
-    ensurePersistentGroupPath();
-
-    final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
-    return backoffHelper.doUntilResult(() -> {
-      try {
-        return groupJoiner.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new JoinException("Interrupted trying to join group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Temporary error trying to join group at path: " + path, e);
-        return null;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error trying to join group at path: " + path, e);
-          return null;
-        } else {
-          throw new JoinException("Problem joining partition group at path: " + path, e);
-        }
-      }
-    });
-  }
-
-  private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
-    backoffHelper.doUntilSuccess(() -> {
-      try {
-        ZooKeeperUtils.ensurePath(zkClient, acl, path);
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-        return false;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error ensuring path: " + path, e);
-          return false;
-        } else {
-          throw new JoinException("Problem ensuring group at path: " + path, e);
-        }
-      }
-    });
-  }
-
-  private class ActiveMembership implements Membership {
-    private final Supplier<byte[]> memberData;
-    private final Command onLoseMembership;
-    private String nodePath;
-    private String memberId;
-    private volatile boolean cancelled;
-    private byte[] membershipData;
-
-    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
-      this.memberData = memberData;
-      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
-    }
-
-    @Override
-    public String getGroupPath() {
-      return path;
-    }
-
-    @Override
-    public synchronized String getMemberId() {
-      return memberId;
-    }
-
-    @Override
-    public synchronized String getMemberPath() {
-      return nodePath;
-    }
-
-    @Override
-    public synchronized byte[] updateMemberData() throws UpdateException {
-      byte[] membershipData = memberData.get();
-      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
-        try {
-          zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
-          this.membershipData = membershipData;
-        } catch (KeeperException e) {
-          throw new UpdateException("Problem updating membership data.", e);
-        } catch (InterruptedException e) {
-          throw new UpdateException("Interrupted attempting to update membership data.", e);
-        } catch (ZooKeeperConnectionException e) {
-          throw new UpdateException(
-              "Could not connect to the ZooKeeper cluster to update membership data.", e);
-        }
-      }
-      return membershipData;
-    }
-
-    @Override
-    public synchronized void cancel() throws JoinException {
-      if (!cancelled) {
-        try {
-          backoffHelper.doUntilSuccess(() -> {
-            try {
-              zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
-              return true;
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
-            } catch (ZooKeeperConnectionException e) {
-              LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-              return false;
-            } catch (NoNodeException e) {
-              LOG.info("Membership already cancelled, node at path: " + nodePath +
-                       " has been deleted");
-              return true;
-            } catch (KeeperException e) {
-              if (zkClient.shouldRetry(e)) {
-                LOG.warn("Temporary error cancelling membership: " + nodePath, e);
-                return false;
-              } else {
-                throw new JoinException("Problem cancelling membership: " + nodePath, e);
-              }
-            }
-          });
-          cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new JoinException("Problem cancelling membership: " + nodePath, e);
-        }
-      }
-    }
-
-    private class CancelledException extends IllegalStateException { /* marker */ }
-
-    synchronized Membership join()
-        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-
-      if (cancelled) {
-        throw new CancelledException();
-      }
-
-      if (nodePath == null) {
-        // Re-join if our ephemeral node goes away due to session expiry - only needs to be
-        // registered once.
-        zkClient.registerExpirationHandler(this::tryJoin);
-      }
-
-      byte[] membershipData = memberData.get();
-      String nodeName = nodeScheme.createName(membershipData);
-      CreateMode createMode = nodeScheme.isSequential()
-          ? CreateMode.EPHEMERAL_SEQUENTIAL
-          : CreateMode.EPHEMERAL;
-      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
-      memberId = Group.this.getMemberId(nodePath);
-      LOG.info("Set group member ID to " + memberId);
-      this.membershipData = membershipData;
-
-      // Re-join if our ephemeral node goes away due to maliciousness.
-      zkClient.get().exists(nodePath, event -> {
-        if (event.getType() == EventType.NodeDeleted) {
-          tryJoin();
-        }
-      });
-
-      return this;
-    }
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
-        () -> {
-          try {
-            join();
-            return true;
-          } catch (CancelledException e) {
-            // Lost a cancel race - that's ok.
-            return true;
-          } catch (ZooKeeperConnectionException e) {
-            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-            return false;
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error re-joining group: " + path, e);
-              return false;
-            } else {
-              throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
-            }
-          }
-        };
-
-    private synchronized void tryJoin() {
-      onLoseMembership.execute();
-      try {
-        backoffHelper.doUntilSuccess(tryJoin);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
-      }
-    }
-  }
-
-  /**
-   * An interface to an object that listens for changes to a group's membership.
-   */
-  public interface GroupChangeListener {
-
-    /**
-     * Called whenever group membership changes with the new list of member ids.
-     *
-     * @param memberIds the current member ids
-     */
-    void onGroupChange(Iterable<String> memberIds);
-  }
-
-  /**
-   * An interface that dictates the scheme to use for storing and filtering nodes that represent
-   * members of a distributed group.
-   */
-  public interface NodeScheme {
-    /**
-     * Determines if a child node is a member of a group by examining the node's name.
-     *
-     * @param nodeName the name of a child node found in a group
-     * @return {@code true} if {@code nodeName} identifies a group member in this scheme
-     */
-    boolean isMember(String nodeName);
-
-    /**
-     * Generates a node name for the node representing this process in the distributed group.
-     *
-     * @param membershipData the data that will be stored in this node
-     * @return the name for the node that will represent this process in the group
-     */
-    String createName(byte[] membershipData);
-
-    /**
-     * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
-     *
-     * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
-     */
-    boolean isSequential();
-  }
-
-  /**
-   * Indicates an error watching a group.
-   */
-  public static class WatchException extends Exception {
-    public WatchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Watches this group for the lifetime of this jvm process.  This method will block until the
-   * current group members are available, notify the {@code groupChangeListener} and then return.
-   * All further changes to the group membership will cause notifications on a background thread.
-   *
-   * @param groupChangeListener the listener to notify of group membership change events
-   * @return A command which, when executed, will stop watching the group.
-   * @throws WatchException if there is a problem generating the 1st group membership list
-   * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
-   */
-  public final Command watch(final GroupChangeListener groupChangeListener)
-      throws WatchException, InterruptedException {
-    Preconditions.checkNotNull(groupChangeListener);
-
-    try {
-      ensurePersistentGroupPath();
-    } catch (JoinException e) {
-      throw new WatchException("Failed to create group path: " + path, e);
-    }
-
-    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
-    backoffHelper.doUntilSuccess(() -> {
-      try {
-        groupMonitor.watchGroup();
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new WatchException("Interrupted trying to watch group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Temporary error trying to watch group at path: " + path, e);
-        return null;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error trying to watch group at path: " + path, e);
-          return null;
-        } else {
-          throw new WatchException("Problem trying to watch group at path: " + path, e);
-        }
-      }
-    });
-    return groupMonitor::stopWatching;
-  }
-
-  /**
-   * Helps continuously monitor a group for membership changes.
-   */
-  private class GroupMonitor {
-    private final GroupChangeListener groupChangeListener;
-    private volatile boolean stopped = false;
-    private Set<String> members;
-
-    GroupMonitor(GroupChangeListener groupChangeListener) {
-      this.groupChangeListener = groupChangeListener;
-    }
-
-    private final Watcher groupWatcher = event -> {
-      if (event.getType() == EventType.NodeChildrenChanged) {
-        tryWatchGroup();
-      }
-    };
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
-        () -> {
-          try {
-            watchGroup();
-            return true;
-          } catch (ZooKeeperConnectionException e) {
-            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-            return false;
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error re-watching group: " + path, e);
-              return false;
-            } else {
-              throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
-            }
-          }
-        };
-
-    private void tryWatchGroup() {
-      if (stopped) {
-        return;
-      }
-
-      try {
-        backoffHelper.doUntilSuccess(tryWatchGroup);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
-      }
-    }
-
-    private void watchGroup()
-        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-
-      if (stopped) {
-        return;
-      }
-
-      List<String> children = zkClient.get().getChildren(path, groupWatcher);
-      setMembers(Iterables.filter(children, nodeNameFilter));
-    }
-
-    private void stopWatching() {
-      // TODO(William Farner): Cancel the watch when
-      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
-      LOG.info("Stopping watch on " + this);
-      stopped = true;
-    }
-
-    synchronized void setMembers(Iterable<String> members) {
-      if (stopped) {
-        LOG.info("Suppressing membership update, no longer watching " + this);
-        return;
-      }
-
-      if (this.members == null) {
-        // Reset our watch on the group if session expires - only needs to be registered once.
-        zkClient.registerExpirationHandler(this::tryWatchGroup);
-      }
-
-      Set<String> membership = ImmutableSet.copyOf(members);
-      if (!membership.equals(this.members)) {
-        groupChangeListener.onGroupChange(members);
-        this.members = membership;
-      }
-    }
-  }
-
-  /**
-   * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
-   * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
-   * prefix is "member_", the node's full path will look something like
-   * {@code /discovery/servicename/member_0000000007}.
-   */
-  public static class DefaultScheme implements NodeScheme {
-    private final String namePrefix;
-    private final Pattern namePattern;
-
-    /**
-     * Creates a sequential node scheme based on the given node name prefix.
-     *
-     * @param namePrefix the prefix for the names of the member nodes
-     */
-    public DefaultScheme(String namePrefix) {
-      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
-      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
-    }
-
-    @Override
-    public boolean isMember(String nodeName) {
-      return namePattern.matcher(nodeName).matches();
-    }
-
-    @Override
-    public String createName(byte[] membershipData) {
-      return namePrefix;
-    }
-
-    @Override
-    public boolean isSequential() {
-      return true;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "Group " + path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
deleted file mode 100644
index aeea02d..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-
-/**
- * A logical set of servers registered in ZooKeeper.  Intended to be used by servers in a
- * common service to advertise their presence to server-set protocol-aware clients.
- *
- * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
- * rendezvous data to zookeeper so that standard clients can interoperate.
- */
-public interface ServerSet {
-
-  /**
-   * Encodes a {@link ServiceInstance} as a JSON object.
-   *
-   * This is the default encoding for service instance data in ZooKeeper.
-   */
-  Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
-
-  /**
-   * Attempts to join a server set for this logical service group.
-   *
-   * @param endpoint the primary service endpoint
-   * @param additionalEndpoints and additional endpoints keyed by their logical name
-   * @return an EndpointStatus object that allows the endpoint to adjust its status
-   * @throws JoinException if there was a problem joining the server set
-   * @throws InterruptedException if interrupted while waiting to join the server set
-   */
-  EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws JoinException, InterruptedException;
-
-  /**
-   * A handle to a service endpoint's status data that allows updating it to track current events.
-   */
-  interface EndpointStatus {
-
-    /**
-     * Removes the endpoint from the server set.
-     *
-     * @throws UpdateException if there was a problem leaving the ServerSet.
-     */
-    void leave() throws UpdateException;
-  }
-
-  /**
-   * Indicates an error updating a service's status information.
-   */
-  class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
deleted file mode 100644
index ace4980..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}.
- */
-public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> {
-  private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class);
-
-  private final ZooKeeperClient zkClient;
-  private final Group group;
-  private final Codec<ServiceInstance> codec;
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a new ServerSet using open ZooKeeper node ACLs.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
-    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
-  }
-
-  /**
-   * Creates a new ServerSet for the given service {@code path}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it does not already exist
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
-    this(zkClient, group, JSON_CODEC);
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
-   *     from a byte array
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
-    this.zkClient = checkNotNull(zkClient);
-    this.group = checkNotNull(group);
-    this.codec = checkNotNull(codec);
-
-    // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
-    backoffHelper = new BackoffHelper();
-  }
-
-  @VisibleForTesting
-  ZooKeeperClient getZkClient() {
-    return zkClient;
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws Group.JoinException, InterruptedException {
-
-    checkNotNull(endpoint);
-    checkNotNull(additionalEndpoints);
-
-    MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints);
-    Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance;
-    Group.Membership membership = group.join(serviceInstanceSupplier);
-
-    return () -> memberStatus.leave(membership);
-  }
-
-  @Override
-  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
-    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
-    try {
-      return serverSetWatcher.watch();
-    } catch (Group.WatchException e) {
-      throw new MonitorException("ZooKeeper watch failed.", e);
-    } catch (InterruptedException e) {
-      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
-    }
-  }
-
-  private class MemberStatus {
-    private final InetSocketAddress endpoint;
-    private final Map<String, InetSocketAddress> additionalEndpoints;
-
-    private MemberStatus(
-        InetSocketAddress endpoint,
-        Map<String, InetSocketAddress> additionalEndpoints) {
-
-      this.endpoint = endpoint;
-      this.additionalEndpoints = additionalEndpoints;
-    }
-
-    synchronized void leave(Group.Membership membership) throws UpdateException {
-      try {
-        membership.cancel();
-      } catch (Group.JoinException e) {
-        throw new UpdateException(
-            "Failed to auto-cancel group membership on transition to DEAD status", e);
-      }
-    }
-
-    byte[] serializeServiceInstance() {
-      ServiceInstance serviceInstance = new ServiceInstance(
-          ServerSets.toEndpoint(endpoint),
-          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
-          Status.ALIVE);
-
-      LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
-      try {
-        return ServerSets.serializeServiceInstance(serviceInstance, codec);
-      } catch (IOException e) {
-        throw new IllegalStateException("Unexpected problem serializing thrift struct " +
-            serviceInstance + "to a byte[]", e);
-      }
-    }
-  }
-
-  private static class ServiceInstanceFetchException extends RuntimeException {
-    ServiceInstanceFetchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  private static class ServiceInstanceDeletedException extends RuntimeException {
-    ServiceInstanceDeletedException(String path) {
-      super(path);
-    }
-  }
-
-  private class ServerSetWatcher {
-    private final ZooKeeperClient zkClient;
-    private final HostChangeMonitor<ServiceInstance> monitor;
-    @Nullable private ImmutableSet<ServiceInstance> serverSet;
-
-    ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
-      this.zkClient = zkClient;
-      this.monitor = monitor;
-    }
-
-    public Command watch() throws Group.WatchException, InterruptedException {
-      Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet);
-
-      try {
-        return group.watch(this::notifyGroupChange);
-      } catch (Group.WatchException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      } catch (InterruptedException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      }
-    }
-
-    private ServiceInstance getServiceInstance(final String nodePath) {
-      try {
-        return backoffHelper.doUntilResult(() -> {
-          try {
-            byte[] data = zkClient.get().getData(nodePath, false, null);
-            return ServerSets.deserializeServiceInstance(data, codec);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new ServiceInstanceFetchException(
-                "Interrupted updating service data for: " + nodePath, e);
-          } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            LOG.warn("Temporary error trying to updating service data for: " + nodePath, e);
-            return null;
-          } catch (NoNodeException e) {
-            invalidateNodePath(nodePath);
-            throw new ServiceInstanceDeletedException(nodePath);
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error trying to update service data for: " + nodePath, e);
-              return null;
-            } else {
-              throw new ServiceInstanceFetchException(
-                  "Failed to update service data for: " + nodePath, e);
-            }
-          } catch (IOException e) {
-            throw new ServiceInstanceFetchException(
-                "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
-          }
-        });
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ServiceInstanceFetchException(
-            "Interrupted trying to update service data for: " + nodePath, e);
-      }
-    }
-
-    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
-        CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
-          @Override public ServiceInstance load(String memberId) {
-            return getServiceInstance(group.getMemberPath(memberId));
-          }
-        });
-
-    private void rebuildServerSet() {
-      Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
-      servicesByMemberId.invalidateAll();
-      notifyGroupChange(memberIds);
-    }
-
-    private String invalidateNodePath(String deletedPath) {
-      String memberId = group.getMemberId(deletedPath);
-      servicesByMemberId.invalidate(memberId);
-      return memberId;
-    }
-
-    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
-        memberId -> {
-          // This get will trigger a fetch
-          try {
-            return servicesByMemberId.getUnchecked(memberId);
-          } catch (UncheckedExecutionException e) {
-            Throwable cause = e.getCause();
-            if (!(cause instanceof ServiceInstanceDeletedException)) {
-              Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
-              throw new IllegalStateException(
-                  "Unexpected error fetching member data for: " + memberId, e);
-            }
-            return null;
-          }
-        };
-
-    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
-      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
-      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
-
-      // Ignore no-op state changes except for the 1st when we've seen no group yet.
-      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
-        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
-        // Implicit removal from servicesByMemberId.
-        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
-
-        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
-            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
-
-        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
-      }
-    }
-
-    private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
-      // ZK nodes may have changed if there was a session expiry for a server in the server set, but
-      // if the server's status has not changed, we can skip any onChange updates.
-      if (!currentServerSet.equals(serverSet)) {
-        if (currentServerSet.isEmpty()) {
-          LOG.warn("server set empty for path " + group.getPath());
-        } else {
-          if (serverSet == null) {
-            LOG.info("received initial membership {}", currentServerSet);
-          } else {
-            logChange(currentServerSet);
-          }
-        }
-        serverSet = currentServerSet;
-        monitor.onChange(serverSet);
-      }
-    }
-
-    private void logChange(ImmutableSet<ServiceInstance> newServerSet) {
-      StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
-      if (serverSet.size() != newServerSet.size()) {
-        message.append("from ").append(serverSet.size())
-            .append(" members to ").append(newServerSet.size());
-      }
-
-      Joiner joiner = Joiner.on("\n\t\t");
-
-      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
-      if (!left.isEmpty()) {
-        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
-      }
-
-      SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
-      if (!joined.isEmpty()) {
-        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
-      }
-
-      LOG.info(message.toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
deleted file mode 100644
index 01a54a5..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * Common ServerSet related functions
- */
-public class ServerSets {
-
-  private ServerSets() {
-    // Utility class.
-  }
-
-  /**
-   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
-   */
-  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
-      ServerSets::toEndpoint;
-
-  /**
-   * Creates a server set that registers at a single path applying the given ACL to all nodes
-   * created in the path.
-   *
-   * @param zkClient ZooKeeper client to register with.
-   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
-   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, java.util.Set)
-   * @return A server set that registers at {@code zkPath}.
-   */
-  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
-    Preconditions.checkNotNull(zkClient);
-    MorePreconditions.checkNotBlank(acl);
-    MorePreconditions.checkNotBlank(zkPath);
-
-    return new ServerSetImpl(zkClient, acl, zkPath);
-  }
-
-  /**
-   * Returns a serialized Thrift service instance object, with given endpoints and codec.
-   *
-   * @param serviceInstance the Thrift service instance object to be serialized
-   * @param codec the codec to use to serialize a Thrift service instance object
-   * @return byte array that contains a serialized Thrift service instance
-   */
-  public static byte[] serializeServiceInstance(
-      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
-
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    codec.serialize(serviceInstance, output);
-    return output.toByteArray();
-  }
-
-  /**
-   * Serializes a service instance based on endpoints.
-   * @see #serializeServiceInstance(ServiceInstance, Codec)
-   *
-   * @param address the target address of the service instance
-   * @param additionalEndpoints additional endpoints of the service instance
-   * @param status service status
-   */
-  public static byte[] serializeServiceInstance(
-      InetSocketAddress address,
-      Map<String, Endpoint> additionalEndpoints,
-      Status status,
-      Codec<ServiceInstance> codec) throws IOException {
-
-    ServiceInstance serviceInstance =
-        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
-    return serializeServiceInstance(serviceInstance, codec);
-  }
-
-  /**
-   * Creates a service instance object deserialized from byte array.
-   *
-   * @param data the byte array contains a serialized Thrift service instance
-   * @param codec the codec to use to deserialize the byte array
-   */
-  public static ServiceInstance deserializeServiceInstance(
-      byte[] data, Codec<ServiceInstance> codec) throws IOException {
-
-    return codec.deserialize(new ByteArrayInputStream(data));
-  }
-
-  /**
-   * Creates an endpoint for the given InetSocketAddress.
-   *
-   * @param address the target address to create the endpoint for
-   */
-  public static Endpoint toEndpoint(InetSocketAddress address) {
-    return new Endpoint(address.getHostName(), address.getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
deleted file mode 100644
index d9978a9..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.zookeeper.data.ACL;
-
-public class SingletonServiceImpl implements SingletonService {
-  @VisibleForTesting
-  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
-
-  /**
-   * Creates a candidate that can be combined with an existing server set to form a singleton
-   * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
-   *
-   * @param zkClient The ZooKeeper client to use.
-   * @param servicePath The path where service nodes live.
-   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
-   * @return A candidate that can be housed with a standard server set under a single zk path.
-   */
-  public static Candidate createSingletonCandidate(
-      ZooKeeperClient zkClient,
-      String servicePath,
-      Iterable<ACL> acl) {
-
-    return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
-  }
-
-  private final ServerSet serverSet;
-  private final Candidate candidate;
-
-  /**
-   * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
-   * advertises itself in the given server set once elected.
-   *
-   * @param serverSet The server set to advertise in on election.
-   * @param candidate The candidacy to use to vie for election.
-   */
-  public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
-    this.serverSet = Preconditions.checkNotNull(serverSet);
-    this.candidate = Preconditions.checkNotNull(candidate);
-  }
-
-  @Override
-  public void lead(final InetSocketAddress endpoint,
-                   final Map<String, InetSocketAddress> additionalEndpoints,
-                   final LeadershipListener listener)
-                   throws LeadException, InterruptedException {
-
-    Preconditions.checkNotNull(listener);
-
-    try {
-      candidate.offerLeadership(new Leader() {
-        @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
-          listener.onLeading(new LeaderControl() {
-            ServerSet.EndpointStatus endpointStatus = null;
-            final AtomicBoolean left = new AtomicBoolean(false);
-
-            // Methods are synchronized to prevent simultaneous invocations.
-            @Override public synchronized void advertise()
-                throws AdvertiseException, InterruptedException {
-
-              Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
-              Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
-              try {
-                endpointStatus = serverSet.join(endpoint, additionalEndpoints);
-              } catch (JoinException e) {
-                throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
-              }
-            }
-
-            @Override public synchronized void leave() throws LeaveException {
-              Preconditions.checkState(left.compareAndSet(false, true),
-                  "Cannot leave more than once.");
-              if (endpointStatus != null) {
-                try {
-                  endpointStatus.leave();
-                } catch (ServerSet.UpdateException e) {
-                  throw new LeaveException("Problem updating endpoint status for abdicating leader " +
-                      "at endpoint " + endpoint, e);
-                }
-              }
-              try {
-                abdicate.execute();
-              } catch (JoinException e) {
-                throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
-              }
-            }
-          });
-        }
-
-        @Override public void onDefeated() {
-          listener.onDefeated();
-        }
-      });
-    } catch (JoinException e) {
-      throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
-    } catch (Group.WatchException e) {
-      throw new LeadException("Problem getting initial membership list for leadership group.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
deleted file mode 100644
index ce243fb..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.InetSocketAddressHelper;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages a connection to a ZooKeeper cluster.
- */
-public class ZooKeeperClient {
-
-  /**
-   * Indicates an error connecting to a zookeeper cluster.
-   */
-  public class ZooKeeperConnectionException extends Exception {
-    ZooKeeperConnectionException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  private final class SessionState {
-    private final long sessionId;
-    private final byte[] sessionPasswd;
-
-    private SessionState(long sessionId, byte[] sessionPasswd) {
-      this.sessionId = sessionId;
-      this.sessionPasswd = sessionPasswd;
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class);
-
-  private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS);
-
-  private final int sessionTimeoutMs;
-  private final Optional<Credentials> credentials;
-  private final String zooKeeperServers;
-  // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
-  // made from within long synchronized blocks.
-  private volatile ZooKeeper zooKeeper;
-  private SessionState sessionState;
-
-  private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
-  private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>();
-
-  private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
-      InetSocketAddress... addresses) {
-    return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get()}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param zooKeeperServer the first, required ZK server
-   * @param zooKeeperServers any additional servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer,
-      InetSocketAddress... zooKeeperServers) {
-    this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
-      Iterable<InetSocketAddress> zooKeeperServers) {
-    this(sessionTimeout, Optional.absent(), Optional.absent(), zooKeeperServers);
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get()}.  All successful connections will be authenticated with the given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param zooKeeperServer the first, required ZK server
-   * @param zooKeeperServers any additional servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
-      InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) {
-    this(sessionTimeout,
-        Optional.of(credentials),
-        Optional.absent(),
-        combine(zooKeeperServer, zooKeeperServers));
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get}.  All successful connections will be authenticated with the given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
-      Iterable<InetSocketAddress> zooKeeperServers) {
-        this(sessionTimeout,
-            Optional.of(credentials),
-            Optional.absent(),
-            zooKeeperServers);
-      }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get}.  All successful connections will be authenticated with the given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param chrootPath an optional chroot path
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials,
-      Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) {
-    this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
-    this.credentials = Preconditions.checkNotNull(credentials);
-
-    if (chrootPath.isPresent()) {
-      PathUtils.validatePath(chrootPath.get());
-    }
-
-    Preconditions.checkNotNull(zooKeeperServers);
-    Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
-        "Must present at least 1 ZK server");
-
-    Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
-      @Override
-      public void run() {
-        while (true) {
-          try {
-            WatchedEvent event = eventQueue.take();
-            for (Watcher watcher : watchers) {
-              watcher.process(event);
-            }
-          } catch (InterruptedException e) { /* ignore */ }
-        }
-      }
-    };
-    watcherProcessor.setDaemon(true);
-    watcherProcessor.start();
-
-    Iterable<String> servers =
-        Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
-            InetSocketAddressHelper::toString);
-    this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or(""));
-  }
-
-  /**
-   * Returns the current active ZK connection or establishes a new one if none has yet been
-   * established or a previous connection was disconnected or had its session time out.  This method
-   * will attempt to re-use sessions when possible.  Equivalent to:
-   * <pre>get(Amount.of(0L, ...)</pre>.
-   *
-   * @return a connected ZooKeeper client
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
-   * @throws InterruptedException if interrupted while waiting for a connection to be established
-   */
-  public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException {
-    try {
-      return get(WAIT_FOREVER);
-    } catch (TimeoutException e) {
-      InterruptedException interruptedException =
-          new InterruptedException("Got an unexpected TimeoutException for 0 wait");
-      interruptedException.initCause(e);
-      throw interruptedException;
-    }
-  }
-
-  /**
-   * Returns the current active ZK connection or establishes a new one if none has yet been
-   * established or a previous connection was disconnected or had its session time out.  This
-   * method will attempt to re-use sessions when possible.
-   *
-   * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK
-   *     cluster to be established; 0 to wait forever
-   * @return a connected ZooKeeper client
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
-   * @throws InterruptedException if interrupted while waiting for a connection to be established
-   * @throws TimeoutException if a connection could not be established within the configured
-   *     session timeout
-   */
-  public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
-      throws ZooKeeperConnectionException, InterruptedException, TimeoutException {
-
-    if (zooKeeper == null) {
-      final CountDownLatch connected = new CountDownLatch(1);
-      Watcher watcher = event -> {
-        switch (event.getType()) {
-          // Guard the None type since this watch may be used as the default watch on calls by
-          // the client outside our control.
-          case None:
-            switch (event.getState()) {
-              case Expired:
-                LOG.info("Zookeeper session expired. Event: " + event);
-                close();
-                break;
-              case SyncConnected:
-                connected.countDown();
-                break;
-            }
-        }
-
-        eventQueue.offer(event);
-      };
-
-      try {
-        zooKeeper = (sessionState != null)
-          ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId,
-            sessionState.sessionPasswd)
-          : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
-      } catch (IOException e) {
-        throw new ZooKeeperConnectionException(
-            "Problem connecting to servers: " + zooKeeperServers, e);
-      }
-
-      if (connectionTimeout.getValue() > 0) {
-        if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
-          close();
-          throw new TimeoutException("Timed out waiting for a ZK connection after "
-                                     + connectionTimeout);
-        }
-      } else {
-        try {
-          connected.await();
-        } catch (InterruptedException ex) {
-          LOG.info("Interrupted while waiting to connect to zooKeeper");
-          close();
-          throw ex;
-        }
-      }
-      if (credentials.isPresent()) {
-        Credentials zkCredentials = credentials.get();
-        zooKeeper.addAuthInfo(zkCredentials.scheme(), zkCredentials.authToken());
-      }
-
-      sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
-    }
-    return zooKeeper;
-  }
-
-  /**
-   * Clients that need to re-establish state after session expiration can register an
-   * {@code onExpired} command to execute.
-   *
-   * @param onExpired the {@code Command} to register
-   * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
-   *     removal.
-   */
-  public Watcher registerExpirationHandler(final Command onExpired) {
-    Watcher watcher = event -> {
-      if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
-        onExpired.execute();
-      }
-    };
-    register(watcher);
-    return watcher;
-  }
-
-  /**
-   * Clients that need to register a top-level {@code Watcher} should do so using this method.  The
-   * registered {@code watcher} will remain registered across re-connects and session expiration
-   * events.
-   *
-   * @param watcher the {@code Watcher to register}
-   */
-  public void register(Watcher watcher) {
-    watchers.add(watcher);
-  }
-
-  /**
-   * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
-   * registered.
-   *
-   * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
-   * @return whether the given {@code Watcher} was found and removed from the active set
-   */
-  public boolean unregister(Watcher watcher) {
-    return watchers.remove(watcher);
-  }
-
-  /**
-   * Checks to see if the client might reasonably re-try an operation given the exception thrown
-   * while attempting it.  If the ZooKeeper session should be expired to enable the re-try to
-   * succeed this method will expire it as a side-effect.
-   *
-   * @param e the exception to test
-   * @return true if a retry can be attempted
-   */
-  public boolean shouldRetry(KeeperException e) {
-    if (e instanceof SessionExpiredException) {
-      close();
-    }
-    return ZooKeeperUtils.isRetryable(e);
-  }
-
-  /**
-   * Closes the current connection if any expiring the current ZooKeeper session.  Any subsequent
-   * calls to this method will no-op until the next successful {@link #get}.
-   */
-  public synchronized void close() {
-    if (zooKeeper != null) {
-      try {
-        zooKeeper.close();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.warn("Interrupted trying to close zooKeeper");
-      } finally {
-        zooKeeper = null;
-        sessionState = null;
-      }
-    }
-  }
-
-  @VisibleForTesting
-  synchronized boolean isClosed() {
-    return zooKeeper == null;
-  }
-
-  @VisibleForTesting
-  ZooKeeper getZooKeeperClientForTests() {
-    return zooKeeper;
-  }
-}


[2/3] aurora git commit: Remove legacy commons ZK code

Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
index 93ddd89..f091384 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
@@ -13,29 +13,19 @@
  */
 package org.apache.aurora.common.zookeeper;
 
-import java.util.List;
-
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Utilities for dealing with zoo keeper.
+ * Utilities for dealing with ZooKeeper.
  */
 public final class ZooKeeperUtils {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
-
   /**
    * An appropriate default session timeout for Twitter ZooKeeper clusters.
    */
@@ -44,12 +34,6 @@ public final class ZooKeeperUtils {
   public static final Amount<Integer,Time> DEFAULT_ZK_CONNECTION_TIMEOUT = Amount.of(10, Time.SECONDS);
 
   /**
-   * The magic version number that allows any mutation to always succeed regardless of actual
-   * version number.
-   */
-  public static final int ANY_VERSION = -1;
-
-  /**
    * An ACL that gives all permissions any user authenticated or not.
    */
   public static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
@@ -65,99 +49,13 @@ public final class ZooKeeperUtils {
           .build();
 
   /**
-   * Returns true if the given exception indicates an error that can be resolved by retrying the
-   * operation without modification.
-   *
-   * @param e the exception to check
-   * @return true if the causing operation is strictly retryable
-   */
-  public static boolean isRetryable(KeeperException e) {
-    Preconditions.checkNotNull(e);
-
-    switch (e.code()) {
-      case CONNECTIONLOSS:
-      case SESSIONEXPIRED:
-      case SESSIONMOVED:
-      case OPERATIONTIMEOUT:
-        return true;
-
-      case RUNTIMEINCONSISTENCY:
-      case DATAINCONSISTENCY:
-      case MARSHALLINGERROR:
-      case BADARGUMENTS:
-      case NONODE:
-      case NOAUTH:
-      case BADVERSION:
-      case NOCHILDRENFOREPHEMERALS:
-      case NODEEXISTS:
-      case NOTEMPTY:
-      case INVALIDCALLBACK:
-      case INVALIDACL:
-      case AUTHFAILED:
-      case UNIMPLEMENTED:
-
-      // These two should not be encountered - they are used internally by ZK to specify ranges
-      case SYSTEMERROR:
-      case APIERROR:
-
-      case OK: // This is actually an invalid ZK exception code
-
-      default:
-        return false;
-    }
-  }
-
-  /**
-   * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}.  If the
-   * path already exists, nothing is done; however if any portion of the path is missing, it will be
-   * created with the given {@code acl} as a persistent zookeeper node.  The given {@code path} must
-   * be a valid zookeeper absolute path.
-   *
-   * @param zkClient the client to use to access the ZK cluster
-   * @param acl the acl to use if creating path nodes
-   * @param path the path to ensure exists
-   * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster
-   * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster
-   * @throws KeeperException if there was a problem in ZK
-   */
-  public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path)
-      throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-    Preconditions.checkNotNull(zkClient);
-    Preconditions.checkNotNull(path);
-    Preconditions.checkArgument(path.startsWith("/"));
-
-    ensurePathInternal(zkClient, acl, path);
-  }
-
-  private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path)
-      throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-    if (zkClient.get().exists(path, false) == null) {
-      // The current path does not exist; so back up a level and ensure the parent path exists
-      // unless we're already a root-level path.
-      int lastPathIndex = path.lastIndexOf('/');
-      if (lastPathIndex > 0) {
-        ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex));
-      }
-
-      // We've ensured our parent path (if any) exists so we can proceed to create our path.
-      try {
-        zkClient.get().create(path, null, acl, CreateMode.PERSISTENT);
-      } catch (KeeperException.NodeExistsException e) {
-        // This ensures we don't die if a race condition was met between checking existence and
-        // trying to create the node.
-        LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?");
-      }
-    }
-  }
-
-  /**
    * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and
    * never ends with a slash (except for root path).
    *
    * @param path the path to be normalized
    * @return normalized path string
    */
-  public static String normalizePath(String path) {
+  static String normalizePath(String path) {
     String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1");
     PathUtils.validatePath(normalizedPath);
     return normalizedPath;

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
deleted file mode 100644
index ba09279..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper.testing;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-
-/**
- * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient.
- */
-public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest {
-
-  private final Amount<Integer, Time> defaultSessionTimeout;
-
-  /**
-   * Creates a test case where the test server uses its
-   * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit
-   * session timeout.
-   */
-  public BaseZooKeeperClientTest() {
-    this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT);
-  }
-
-  /**
-   * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for
-   * clients created without an explicit session timeout.
-   */
-  public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) {
-    this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout);
-  }
-
-
-  /**
-   * Starts zookeeper back up on the last used port.
-   */
-  protected final void restartNetwork() throws IOException, InterruptedException {
-    getServer().restartNetwork();
-  }
-
-  /**
-   * Shuts down the in-process zookeeper network server.
-   */
-  protected final void shutdownNetwork() {
-    getServer().shutdownNetwork();
-  }
-
-  /**
-   * Expires the active session for the given client.  The client should be one returned from
-   * {@link #createZkClient}.
-   *
-   * @param zkClient the client to expire
-   * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to
-   *    the local zk server while trying to expire the session
-   * @throws InterruptedException if interrupted while requesting expiration
-   */
-  protected final void expireSession(ZooKeeperClient zkClient)
-      throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException {
-    getServer().expireClientSession(zkClient.get().getSessionId());
-  }
-
-  /**
-   * Returns the current port to connect to the in-process zookeeper instance.
-   */
-  protected final int getPort() {
-    return getServer().getPort();
-  }
-
-  /**
-   * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
-   * with the default session timeout.
-   */
-  protected final ZooKeeperClient createZkClient() {
-    return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent());
-  }
-
-  /**
-   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
-   * the default session timeout.
-   */
-  protected final ZooKeeperClient createZkClient(Credentials credentials) {
-    return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent());
-  }
-
-  /**
-   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
-   * the default session timeout.  The client is authenticated in the digest authentication scheme
-   * with the given {@code username} and {@code password}.
-   */
-  protected final ZooKeeperClient createZkClient(String username, String password) {
-    return createZkClient(Credentials.digestCredentials(username, password));
-  }
-
-  /**
-   * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
-   * with a custom {@code sessionTimeout}.
-   */
-  protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) {
-    return createZkClient(sessionTimeout, Optional.absent(), Optional.absent());
-  }
-
-  /**
-   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
-   * the default session timeout and the custom chroot path.
-   */
-  protected final ZooKeeperClient createZkClient(String chrootPath) {
-    return createZkClient(defaultSessionTimeout, Optional.absent(),
-        Optional.of(chrootPath));
-  }
-
-  private ZooKeeperClient createZkClient(
-      Amount<Integer, Time> sessionTimeout,
-      Optional<Credentials> credentials,
-      Optional<String> chrootPath) {
-
-    ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath,
-        ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort())));
-    addTearDown(client::close);
-    return client;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
index 29204cd..a4504b8 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -19,8 +19,6 @@ import java.net.InetSocketAddress;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -34,8 +32,6 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
  */
 public class ZooKeeperTestServer {
 
-  static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS);
-
   private final File dataDir;
   private final File snapDir;
 
@@ -91,7 +87,7 @@ public class ZooKeeperTestServer {
   /**
    * Shuts down the in-process zookeeper network server.
    */
-  final void shutdownNetwork() {
+  private void shutdownNetwork() {
     if (connectionFactory != null) {
       connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
       connectionFactory = null;

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
deleted file mode 100644
index 9c0cebe..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-public class CandidateImplTest extends BaseZooKeeperClientTest {
-  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-  private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
-  private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
-
-  private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
-
-  @Before
-  public void mySetUp() throws IOException {
-    candidateBuffer = new LinkedBlockingDeque<>();
-  }
-
-  private Group createGroup(ZooKeeperClient zkClient) throws IOException {
-    return new Group(zkClient, ACL, SERVICE);
-  }
-
-  private class Reign implements Candidate.Leader {
-    private ExceptionalCommand<Group.JoinException> abdicate;
-    private final CandidateImpl candidate;
-    private final String id;
-    private CountDownLatch defeated = new CountDownLatch(1);
-
-    Reign(String id, CandidateImpl candidate) {
-      this.id = id;
-      this.candidate = candidate;
-    }
-
-    @Override
-    public void onElected(ExceptionalCommand<Group.JoinException> abdicate) {
-      candidateBuffer.offerFirst(candidate);
-      this.abdicate = abdicate;
-    }
-
-    @Override
-    public void onDefeated() {
-      defeated.countDown();
-    }
-
-    public void abdicate() throws Group.JoinException {
-      Preconditions.checkState(abdicate != null);
-      abdicate.execute();
-    }
-
-    public void expectDefeated() throws InterruptedException {
-      defeated.await();
-    }
-
-    @Override
-    public String toString() {
-      return id;
-    }
-  }
-
-  @Test
-  public void testOfferLeadership() throws Exception {
-    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
-      @Override public String toString() {
-        return "Leader1";
-      }
-    };
-    ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
-      @Override public String toString() {
-        return "Leader2";
-      }
-    };
-    ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
-      @Override public String toString() {
-        return "Leader3";
-      }
-    };
-
-    Reign candidate1Reign = new Reign("1", candidate1);
-    Reign candidate2Reign = new Reign("2", candidate2);
-    Reign candidate3Reign = new Reign("3", candidate3);
-
-    Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
-    Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
-    Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
-
-    assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
-        candidate1Leader.get());
-
-    shutdownNetwork();
-    restartNetwork();
-
-    assertTrue("A re-connect without a session expiration should leave the leader elected",
-        candidate1Leader.get());
-
-    candidate1Reign.abdicate();
-    assertSame(candidate1, candidateBuffer.takeLast());
-    assertFalse(candidate1Leader.get());
-    // Active abdication should trigger defeat.
-    candidate1Reign.expectDefeated();
-
-    CandidateImpl secondCandidate = candidateBuffer.takeLast();
-    assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
-               + candidateBuffer,
-        candidate2Leader.get() ^ candidate3Leader.get());
-
-    if (secondCandidate == candidate2) {
-      expireSession(zkClient2);
-      assertSame(candidate3, candidateBuffer.takeLast());
-      assertTrue(candidate3Leader.get());
-      // Passive expiration should trigger defeat.
-      candidate2Reign.expectDefeated();
-    } else {
-      expireSession(zkClient3);
-      assertSame(candidate2, candidateBuffer.takeLast());
-      assertTrue(candidate2Leader.get());
-      // Passive expiration should trigger defeat.
-      candidate3Reign.expectDefeated();
-    }
-  }
-
-  @Test
-  public void testEmptyMembership() throws Exception {
-    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
-    Reign candidate1Reign = new Reign("1", candidate1);
-
-    candidate1.offerLeadership(candidate1Reign);
-    assertSame(candidate1, candidateBuffer.takeLast());
-    candidate1Reign.abdicate();
-    assertFalse(candidate1.getLeaderData().isPresent());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
new file mode 100644
index 0000000..16c0171
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EncodingTest {
+  @Test
+  public void testSimpleSerialization() throws Exception {
+    InetSocketAddress endpoint = new InetSocketAddress(12345);
+    Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
+    Status status = Status.ALIVE;
+
+    byte[] data = Encoding.serializeServiceInstance(
+        endpoint, additionalEndpoints, status, Encoding.JSON_CODEC);
+
+    ServiceInstance instance = Encoding.deserializeServiceInstance(data, Encoding.JSON_CODEC);
+
+    assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
+    assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
+    assertEquals(Status.ALIVE, instance.getStatus());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
deleted file mode 100644
index 97a42d1..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.NodeScheme;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
-
-public class GroupTest extends BaseZooKeeperClientTest {
-
-  private ZooKeeperClient zkClient;
-  private Group joinGroup;
-  private Group watchGroup;
-  private Command stopWatching;
-  private Command onLoseMembership;
-
-  private RecordingListener listener;
-
-  public GroupTest() {
-    super(Amount.of(1, Time.DAYS));
-  }
-
-  @Before
-  public void mySetUp() throws Exception {
-    onLoseMembership = createMock(Command.class);
-
-    zkClient = createZkClient("group", "test");
-    joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
-    watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
-
-    listener = new RecordingListener();
-    stopWatching = watchGroup.watch(listener);
-  }
-
-  private static class RecordingListener implements GroupChangeListener {
-    private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
-        new LinkedBlockingQueue<Iterable<String>>();
-
-    @Override
-    public void onGroupChange(Iterable<String> memberIds) {
-      membershipChanges.add(memberIds);
-    }
-
-    public Iterable<String> take() throws InterruptedException {
-      return membershipChanges.take();
-    }
-
-    public void assertEmpty() {
-      assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
-    }
-
-    @Override
-    public String toString() {
-      return membershipChanges.toString();
-    }
-  }
-
-  private static class CustomScheme implements NodeScheme {
-    static final String NODE_NAME = "custom_name";
-
-    @Override
-    public boolean isMember(String nodeName) {
-      return NODE_NAME.equals(nodeName);
-    }
-
-    @Override
-    public String createName(byte[] membershipData) {
-      return NODE_NAME;
-    }
-
-    @Override
-    public boolean isSequential() {
-      return false;
-    }
-  }
-
-  @Test
-  public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
-    final CountDownLatch lostMembership = new CountDownLatch(1);
-    Command onLoseMembership = lostMembership::countDown;
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    assertMembershipObserved(membership.getMemberId());
-    expireSession(zkClient);
-
-    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
-  }
-
-  @Test
-  public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
-    final CountDownLatch lostMembership = new CountDownLatch(1);
-    Command onLoseMembership = lostMembership::countDown;
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    assertMembershipObserved(membership.getMemberId());
-    membership.cancel();
-
-    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
-  }
-
-  @Test
-  public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join();
-    String originalMemberId = membership.getMemberId();
-    assertMembershipObserved(originalMemberId);
-
-    shutdownNetwork();
-    restartNetwork();
-
-    // The member should still be present under existing ephemeral node since session did not
-    // expire.
-    watchGroup.watch(listener);
-    assertMembershipObserved(originalMemberId);
-
-    membership.cancel();
-
-    assertEmptyMembershipObserved();
-    assertEmptyMembershipObserved(); // and again for 2nd listener
-
-    listener.assertEmpty();
-
-    verify(onLoseMembership);
-    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
-  }
-
-  @Test
-  public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
-    onLoseMembership.execute();
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    String originalMemberId = membership.getMemberId();
-    assertMembershipObserved(originalMemberId);
-
-    expireSession(zkClient);
-
-    // We should have lost our group membership and then re-gained it with a new ephemeral node.
-    // We may or may-not see the intermediate state change but we must see the final state
-    Iterable<String> members = listener.take();
-    if (Iterables.isEmpty(members)) {
-      members = listener.take();
-    }
-    assertEquals(1, Iterables.size(members));
-    assertNotEquals(originalMemberId, Iterables.getOnlyElement(members));
-    assertNotEquals(originalMemberId, membership.getMemberId());
-
-    listener.assertEmpty();
-
-    verify(onLoseMembership);
-    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
-  }
-
-  @Test
-  public void testJoinCustomNamingScheme() throws Exception {
-    Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
-        new CustomScheme());
-
-    listener = new RecordingListener();
-    group.watch(listener);
-    assertEmptyMembershipObserved();
-
-    Membership membership = group.join();
-    String memberId = membership.getMemberId();
-
-    assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
-    assertMembershipObserved(memberId);
-
-    expireSession(zkClient);
-  }
-
-  @Test
-  public void testUpdateMembershipData() throws Exception {
-    Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
-
-    byte[] initial = "start".getBytes();
-    expect(dataSupplier.get()).andReturn(initial);
-
-    byte[] second = "update".getBytes();
-    expect(dataSupplier.get()).andReturn(second);
-
-    replay(dataSupplier);
-
-    Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
-    assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
-        .getData(membership.getMemberPath(), false, null));
-
-    assertArrayEquals("Updating supplier should not change membership data",
-        initial, zkClient.get().getData(membership.getMemberPath(), false, null));
-
-    membership.updateMemberData();
-    assertArrayEquals("Updating membership should change data",
-        second, zkClient.get().getData(membership.getMemberPath(), false, null));
-
-    verify(dataSupplier);
-  }
-
-  @Test
-  public void testAcls() throws Exception {
-    Group securedMembership =
-        new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
-            "/secured/group/membership");
-
-    String memberId = securedMembership.join().getMemberId();
-
-    Group unauthenticatedObserver =
-        new Group(createZkClient(),
-            Ids.READ_ACL_UNSAFE,
-            "/secured/group/membership");
-    RecordingListener unauthenticatedListener = new RecordingListener();
-    unauthenticatedObserver.watch(unauthenticatedListener);
-
-    assertMembershipObserved(unauthenticatedListener, memberId);
-
-    try {
-      unauthenticatedObserver.join();
-      fail("Expected join exception for unauthenticated observer");
-    } catch (JoinException e) {
-      // expected
-    }
-
-    Group unauthorizedObserver =
-        new Group(createZkClient("joe", "schmoe"),
-            Ids.READ_ACL_UNSAFE,
-            "/secured/group/membership");
-    RecordingListener unauthorizedListener = new RecordingListener();
-    unauthorizedObserver.watch(unauthorizedListener);
-
-    assertMembershipObserved(unauthorizedListener, memberId);
-
-    try {
-      unauthorizedObserver.join();
-      fail("Expected join exception for unauthorized observer");
-    } catch (JoinException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testStopWatching() throws Exception {
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership member1 = joinGroup.join();
-    String memberId1 = member1.getMemberId();
-    assertMembershipObserved(memberId1);
-
-    Membership member2 = joinGroup.join();
-    String memberId2 = member2.getMemberId();
-    assertMembershipObserved(memberId1, memberId2);
-
-    stopWatching.execute();
-
-    member1.cancel();
-    Membership member3 = joinGroup.join();
-    member2.cancel();
-    member3.cancel();
-
-    listener.assertEmpty();
-  }
-
-  private void assertEmptyMembershipObserved() throws InterruptedException {
-    assertMembershipObserved();
-  }
-
-  private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
-    assertMembershipObserved(listener, expectedMemberIds);
-  }
-
-  private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
-      throws InterruptedException {
-
-    assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
index 2166123..6cf335d 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
@@ -52,25 +52,25 @@ public class JsonCodecTest {
         ImmutableMap.of("http", new Endpoint("foo", 8080)),
         Status.ALIVE)
         .setShard(0);
-    byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+    byte[] data = Encoding.serializeServiceInstance(instance1, codec);
+    assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertTrue(Encoding.deserializeServiceInstance(data, codec).isSetShard());
 
     ServiceInstance instance2 = new ServiceInstance(
         new Endpoint("foo", 1000),
         ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
         Status.ALIVE);
-    data = ServerSets.serializeServiceInstance(instance2, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+    data = Encoding.serializeServiceInstance(instance2, codec);
+    assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(Encoding.deserializeServiceInstance(data, codec).isSetShard());
 
     ServiceInstance instance3 = new ServiceInstance(
         new Endpoint("foo", 1000),
         ImmutableMap.<String, Endpoint>of(),
         Status.ALIVE);
-    data = ServerSets.serializeServiceInstance(instance3, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+    data = Encoding.serializeServiceInstance(instance3, codec);
+    assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(Encoding.deserializeServiceInstance(data, codec).isSetShard());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
deleted file mode 100644
index f0c0cb4..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- *
- * TODO(William Farner): Change this to remove thrift dependency.
- */
-public class ServerSetImplTest extends BaseZooKeeperClientTest {
-  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-  private static final String SERVICE = "/twitter/services/puffin_hosebird";
-
-  private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer;
-  private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor;
-
-  @Before
-  public void mySetUp() throws IOException {
-    serverSetBuffer = new LinkedBlockingQueue<>();
-    serverSetMonitor = serverSetBuffer::offer;
-  }
-
-  private ServerSetImpl createServerSet() throws IOException {
-    return new ServerSetImpl(createZkClient(), ACL, SERVICE);
-  }
-
-  @Test
-  public void testLifecycle() throws Exception {
-    ServerSetImpl client = createServerSet();
-    client.watch(serverSetMonitor);
-    assertChangeFiredEmpty();
-
-    ServerSetImpl server = createServerSet();
-    ServerSet.EndpointStatus status = server.join(
-        InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080));
-
-    ServiceInstance serviceInstance = new ServiceInstance(
-        new Endpoint("foo", 1234),
-        ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
-        Status.ALIVE);
-
-    assertChangeFired(serviceInstance);
-
-    status.leave();
-    assertChangeFiredEmpty();
-    assertTrue(serverSetBuffer.isEmpty());
-  }
-
-  @Test
-  public void testMembershipChanges() throws Exception {
-    ServerSetImpl client = createServerSet();
-    client.watch(serverSetMonitor);
-    assertChangeFiredEmpty();
-
-    ServerSetImpl server = createServerSet();
-
-    ServerSet.EndpointStatus foo = join(server, "foo");
-    assertChangeFired("foo");
-
-    expireSession(client.getZkClient());
-
-    ServerSet.EndpointStatus bar = join(server, "bar");
-
-    // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a
-    // change, just "foo", "bar" since this was an addition.
-    assertChangeFired("foo", "bar");
-
-    foo.leave();
-    assertChangeFired("bar");
-
-    ServerSet.EndpointStatus baz = join(server, "baz");
-    assertChangeFired("bar", "baz");
-
-    baz.leave();
-    assertChangeFired("bar");
-
-    bar.leave();
-    assertChangeFiredEmpty();
-
-    assertTrue(serverSetBuffer.isEmpty());
-  }
-
-  @Test
-  public void testStopMonitoring() throws Exception {
-    ServerSetImpl client = createServerSet();
-    Command stopMonitoring = client.watch(serverSetMonitor);
-    assertChangeFiredEmpty();
-
-    ServerSetImpl server = createServerSet();
-
-    ServerSet.EndpointStatus foo = join(server, "foo");
-    assertChangeFired("foo");
-    ServerSet.EndpointStatus bar = join(server, "bar");
-    assertChangeFired("foo", "bar");
-
-    stopMonitoring.execute();
-
-    // No new updates should be received since monitoring has stopped.
-    foo.leave();
-    assertTrue(serverSetBuffer.isEmpty());
-
-    // Expiration event.
-    assertTrue(serverSetBuffer.isEmpty());
-  }
-
-  @Test
-  public void testOrdering() throws Exception {
-    ServerSetImpl client = createServerSet();
-    client.watch(serverSetMonitor);
-    assertChangeFiredEmpty();
-
-    Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080);
-    Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081);
-    Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082);
-
-    ServerSetImpl server1 = createServerSet();
-    ServerSetImpl server2 = createServerSet();
-    ServerSetImpl server3 = createServerSet();
-
-    ServiceInstance instance1 = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
-        Status.ALIVE);
-    ServiceInstance instance2 = new ServiceInstance(
-        new Endpoint("foo", 1001),
-        ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
-        Status.ALIVE);
-    ServiceInstance instance3 = new ServiceInstance(
-        new Endpoint("foo", 1002),
-        ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
-        Status.ALIVE);
-
-    server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports);
-    assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
-
-    ServerSet.EndpointStatus status2 = server2.join(
-        InetSocketAddress.createUnresolved("foo", 1001),
-        server2Ports);
-    assertEquals(ImmutableList.of(instance1, instance2),
-        ImmutableList.copyOf(serverSetBuffer.take()));
-
-    server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports);
-    assertEquals(ImmutableList.of(instance1, instance2, instance3),
-        ImmutableList.copyOf(serverSetBuffer.take()));
-
-    status2.leave();
-    assertEquals(ImmutableList.of(instance1, instance3),
-        ImmutableList.copyOf(serverSetBuffer.take()));
-  }
-
-  @Test
-  public void testUnwatchOnException() throws Exception {
-    IMocksControl control = createControl();
-
-    ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class);
-    Watcher onExpirationWatcher = control.createMock(Watcher.class);
-
-    expect(zkClient.registerExpirationHandler(anyObject(Command.class)))
-        .andReturn(onExpirationWatcher);
-
-    expect(zkClient.get()).andThrow(new InterruptedException());  // See interrupted() note below.
-    expect(zkClient.unregister(onExpirationWatcher)).andReturn(true);
-    control.replay();
-
-    Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla");
-    ServerSetImpl serverset = new ServerSetImpl(zkClient, group);
-
-    try {
-      serverset.watch(hostSet -> {});
-      fail("Expected MonitorException");
-    } catch (DynamicHostSet.MonitorException e) {
-      // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is.
-      // That call both returns the current interrupted status as well as clearing it.  The clearing
-      // is crucial depending on the order tests are run in this class.  If this test runs before
-      // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail
-      // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in
-      // the interruption mechanism and so immediately throws `InterruptedException` based on the
-      // un-cleared interrupted bit.
-      assertTrue(Thread.interrupted());
-    }
-    control.verify();
-  }
-
-  private static Map<String, InetSocketAddress> makePortMap(String name, int port) {
-    return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port));
-  }
-
-  private ServerSet.EndpointStatus join(ServerSet serverSet, String host)
-      throws JoinException, InterruptedException {
-
-    return serverSet.join(
-        InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of());
-  }
-
-  private void assertChangeFired(String... serviceHosts)
-      throws InterruptedException {
-
-    assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts),
-        serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42),
-            ImmutableMap.<String, Endpoint>of(), Status.ALIVE))));
-  }
-
-  protected void assertChangeFiredEmpty() throws InterruptedException {
-    assertChangeFired(ImmutableSet.<ServiceInstance>of());
-  }
-
-  protected void assertChangeFired(ServiceInstance... serviceInstances)
-      throws InterruptedException {
-    assertChangeFired(ImmutableSet.copyOf(serviceInstances));
-  }
-
-  protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances)
-      throws InterruptedException {
-    assertEquals(serviceInstances, serverSetBuffer.take());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
deleted file mode 100644
index 0e67191..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class ServerSetsTest {
-  @Test
-  public void testSimpleSerialization() throws Exception {
-    InetSocketAddress endpoint = new InetSocketAddress(12345);
-    Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
-    Status status = Status.ALIVE;
-
-    byte[] data = ServerSets.serializeServiceInstance(
-        endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
-
-    ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
-
-    assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
-    assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
-    assertEquals(Status.ALIVE, instance.getStatus());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
deleted file mode 100644
index 5f6cdd8..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
-import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.easymock.Capture;
-import org.easymock.IExpectationSetters;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.fail;
-
-public class SingletonServiceImplTest extends BaseZooKeeperClientTest {
-  private static final int PORT_A = 1234;
-  private static final int PORT_B = 8080;
-  private static final InetSocketAddress PRIMARY_ENDPOINT =
-      InetSocketAddress.createUnresolved("foo", PORT_A);
-  private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
-      ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
-
-  private IMocksControl control;
-  private SingletonServiceImpl.LeadershipListener listener;
-  private ServerSet serverSet;
-  private ServerSet.EndpointStatus endpointStatus;
-  private Candidate candidate;
-  private ExceptionalCommand<Group.JoinException> abdicate;
-
-  private SingletonService service;
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void mySetUp() throws IOException {
-    control = createControl();
-    addTearDown(control::verify);
-    listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
-    serverSet = control.createMock(ServerSet.class);
-    candidate = control.createMock(Candidate.class);
-    endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
-    abdicate = control.createMock(ExceptionalCommand.class);
-
-    service = new SingletonServiceImpl(serverSet, candidate);
-  }
-
-  private void newLeader(
-      final String hostName,
-      Capture<Leader> leader,
-      LeadershipListener listener) throws Exception {
-
-    service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
-        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
-        listener);
-
-    // This actually elects the leader.
-    leader.getValue().onElected(abdicate);
-  }
-
-  private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
-    newLeader(hostName, leader, listener);
-  }
-
-  private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
-    return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
-  }
-
-  @Test
-  public void testLeadAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-    endpointStatus.leave();
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().leave();
-  }
-
-  @Test
-  public void teatLeadLeaveNoAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    abdicate.execute();
-
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().leave();
-  }
-
-  @Test
-  public void testLeadJoinFailure() throws Exception {
-    Capture<Leader> leaderCapture = new Capture<Leader>();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-
-    try {
-      controlCapture.getValue().advertise();
-      fail("Join should have failed.");
-    } catch (SingletonService.AdvertiseException e) {
-      // Expected.
-    }
-
-    controlCapture.getValue().leave();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testMultipleAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().advertise();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testMultipleLeave() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-    endpointStatus.leave();
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().leave();
-    controlCapture.getValue().leave();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testAdvertiseAfterLeave() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().leave();
-    controlCapture.getValue().advertise();
-  }
-
-  @Test
-  public void testLeadMulti() throws Exception {
-    List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
-    List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
-
-    for (int i = 0; i < 5; i++) {
-      Capture<Leader> leaderCapture = new Capture<Leader>();
-      leaderCaptures.add(leaderCapture);
-      Capture<LeaderControl> controlCapture = createCapture();
-      leaderControlCaptures.add(controlCapture);
-
-      expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-      listener.onLeading(capture(controlCapture));
-      InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
-      Map<String, InetSocketAddress> aux =
-          ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
-      expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
-      endpointStatus.leave();
-      abdicate.execute();
-    }
-
-    control.replay();
-
-    for (int i = 0; i < 5; i++) {
-      final String leaderName = "foo" + i;
-      newLeader(leaderName, leaderCaptures.get(i));
-      leaderControlCaptures.get(i).getValue().advertise();
-      leaderControlCaptures.get(i).getValue().leave();
-    }
-  }
-
-  @Test
-  public void testLeaderLeaves() throws Exception {
-    control.replay();
-    shutdownNetwork();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
deleted file mode 100644
index 5eee235..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.NoAuthException;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * @author John Sirois
- */
-public class ZooKeeperClientTest extends BaseZooKeeperClientTest {
-
-  public ZooKeeperClientTest() {
-    super(Amount.of(1, Time.DAYS));
-  }
-
-  @Test
-  public void testGet() throws Exception {
-    final ZooKeeperClient zkClient = createZkClient();
-    shutdownNetwork();
-    try {
-      zkClient.get(Amount.of(50L, Time.MILLISECONDS));
-      fail("Expected client connection to timeout while network down");
-    } catch (TimeoutException e) {
-      assertTrue(zkClient.isClosed());
-    }
-    assertNull(zkClient.getZooKeeperClientForTests());
-
-    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
-    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
-    new Thread(() -> {
-      try {
-        client.set(zkClient.get());
-      } catch (ZooKeeperConnectionException e) {
-        throw new RuntimeException(e);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      } finally {
-        blockingGetComplete.countDown();
-      }
-    }).start();
-
-    restartNetwork();
-
-    // Hung blocking connects should succeed when server connection comes up
-    blockingGetComplete.await();
-    assertNotNull(client.get());
-
-    // New connections should succeed now that network is back up
-    long sessionId = zkClient.get().getSessionId();
-
-    // While connected the same client should be reused (no new connections while healthy)
-    assertSame(client.get(), zkClient.get());
-
-    shutdownNetwork();
-    // Our client doesn't know the network is down yet so we should be able to get()
-    ZooKeeper zooKeeper = zkClient.get();
-    try {
-      zooKeeper.exists("/", false);
-      fail("Expected client operation to fail while network down");
-    } catch (ConnectionLossException e) {
-      // expected
-    }
-
-    restartNetwork();
-    assertEquals("Expected connection to be re-established with existing session",
-        sessionId, zkClient.get().getSessionId());
-  }
-
-  /**
-   * Test that if a blocking get() call gets interrupted, after a connection has been created
-   * but before it's connected, the zk connection gets closed.
-   */
-  @Test
-  public void testGetInterrupted() throws Exception {
-    final ZooKeeperClient zkClient = createZkClient();
-    shutdownNetwork();
-
-    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
-    final AtomicBoolean interrupted = new AtomicBoolean();
-    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
-    Thread getThread = new Thread(() -> {
-      try {
-        client.set(zkClient.get());
-      } catch (ZooKeeperConnectionException e) {
-        throw new RuntimeException(e);
-      } catch (InterruptedException e) {
-        interrupted.set(true);
-        throw new RuntimeException(e);
-      } finally {
-        blockingGetComplete.countDown();
-      }
-    });
-    getThread.start();
-
-    while (zkClient.getZooKeeperClientForTests() == null) {
-      Thread.sleep(100);
-    }
-
-    getThread.interrupt();
-    blockingGetComplete.await();
-
-    assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests());
-    assertTrue("The waiter thread should have been interrupted", interrupted.get());
-    assertTrue(zkClient.isClosed());
-  }
-
-  @Test
-  public void testClose() throws Exception {
-    ZooKeeperClient zkClient = createZkClient();
-    zkClient.close();
-
-    // Close should be idempotent
-    zkClient.close();
-
-    long firstSessionId = zkClient.get().getSessionId();
-
-    // Close on an open client should force session re-establishment
-    zkClient.close();
-
-    assertNotEquals(firstSessionId, zkClient.get().getSessionId());
-  }
-
-  @Test
-  public void testCredentials() throws Exception {
-    String path = "/test";
-    ZooKeeperClient authenticatedClient = createZkClient("creator", "creator");
-    assertEquals(path,
-        authenticatedClient.get().create(path, "42".getBytes(),
-            ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT));
-
-    ZooKeeperClient unauthenticatedClient = createZkClient();
-    assertEquals("42", getData(unauthenticatedClient, path));
-    try {
-      setData(unauthenticatedClient, path, "37");
-      fail("Expected unauthenticated write attempt to fail");
-    } catch (NoAuthException e) {
-      assertEquals("42", getData(unauthenticatedClient, path));
-    }
-
-    ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner");
-    assertEquals("42", getData(nonOwnerClient, path));
-    try {
-      setData(nonOwnerClient, path, "37");
-      fail("Expected non owner write attempt to fail");
-    } catch (NoAuthException e) {
-      assertEquals("42", getData(nonOwnerClient, path));
-    }
-
-    ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator");
-    setData(authenticatedClient2, path, "37");
-    assertEquals("37", getData(authenticatedClient2, path));
-  }
-
-  @Test
-  public void testChrootPath() throws Exception {
-    ZooKeeperClient rootClient = createZkClient();
-    String rootPath = "/test";
-    String subPath = "/test/subtest";
-    assertEquals(rootPath,
-            rootClient.get().create(rootPath, "42".getBytes(),
-                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-    assertEquals(subPath,
-            rootClient.get().create(subPath, "37".getBytes(),
-                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-
-    ZooKeeperClient chrootedClient = createZkClient(rootPath);
-    assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null));
-  }
-
-  private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception {
-    zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION);
-  }
-
-  private String getData(ZooKeeperClient zkClient, String path) throws Exception {
-    return new String(zkClient.get().getData(path, false, null));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
index 9e482a6..5eb3c5e 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
@@ -13,87 +13,16 @@
  */
 package org.apache.aurora.common.zookeeper;
 
-import com.google.common.base.Charsets;
-
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.BadVersionException;
-import org.apache.zookeeper.KeeperException.NoAuthException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
 import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
  * @author John Sirois
  */
-public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
-  @Test
-  public void testEnsurePath() throws Exception {
-    ZooKeeperClient zkClient = createZkClient();
-    zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8));
-
-    assertNull(zkClient.get().exists("/foo", false));
-    ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz");
-
-    zkClient = createZkClient();
-    zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8));
-
-    // Anyone can check for existence in ZK
-    assertNotNull(zkClient.get().exists("/foo", false));
-    assertNotNull(zkClient.get().exists("/foo/bar", false));
-    assertNotNull(zkClient.get().exists("/foo/bar/baz", false));
-
-    try {
-      zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */);
-      fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be "
-           + "rejected");
-    } catch (NoAuthException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception {
-    String nodePath = "/foo";
-    ZooKeeperClient zkClient = createZkClient();
-
-    zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
-
-    Stat initStat = new Stat();
-    byte[] initialData = zkClient.get().getData(nodePath, false, initStat);
-    assertArrayEquals("init".getBytes(), initialData);
-
-    // bump the version
-    Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion());
-
-    try {
-      zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion());
-      fail("expected correct version to be required");
-    } catch (BadVersionException e) {
-      // expected
-    }
-
-    // expect using the correct version to work
-    Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion());
-    assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion());
-
-    zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION);
-    Stat forceWriteStat = new Stat();
-    byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat);
-    assertArrayEquals("force-write".getBytes(), forceWriteData);
-
-    assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion());
-    assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion());
-  }
+public class ZooKeeperUtilsTest extends BaseZooKeeperTest {
 
   @Test
   public void testNormalizingPath() throws Exception {
@@ -135,5 +64,4 @@ public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
       // expected
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
deleted file mode 100644
index 339f63b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-
-import javax.inject.Singleton;
-
-import com.google.inject.Exposed;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.zookeeper.ServerSetImpl;
-import org.apache.aurora.common.zookeeper.SingletonService;
-import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-import org.apache.zookeeper.data.ACL;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Binding module for utilities to advertise the network presence of the scheduler.
- *
- * Uses a fork of Twitter commons/zookeeper.
- */
-class CommonsServiceDiscoveryModule extends PrivateModule {
-
-  private final String discoveryPath;
-  private final ZooKeeperConfig zooKeeperConfig;
-
-  CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
-    this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath);
-    this.zooKeeperConfig = requireNonNull(zooKeeperConfig);
-  }
-
-  @Override
-  protected void configure() {
-    requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
-    requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
-
-    bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class);
-    expose(ServiceGroupMonitor.class);
-  }
-
-  @Provides
-  @Singleton
-  ZooKeeperClient provideZooKeeperClient(
-      @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) {
-
-    return new ZooKeeperClient(
-        zooKeeperConfig.getSessionTimeout(),
-        zooKeeperConfig.getCredentials(),
-        zooKeeperConfig.getChrootPath(),
-        zooKeeperCluster);
-  }
-
-  @Provides
-  @Singleton
-  ServerSetImpl provideServerSet(
-      ZooKeeperClient client,
-      @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) {
-
-    return new ServerSetImpl(client, zooKeeperAcls, discoveryPath);
-  }
-
-  @Provides
-  @Singleton
-  DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) {
-    // Used for a type re-binding of the server set.
-    return serverSet;
-  }
-
-  // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding.
-  @Provides
-  @Singleton
-  @Exposed
-  SingletonService provideSingletonService(
-      ZooKeeperClient client,
-      ServerSetImpl serverSet,
-      @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) {
-
-    return new SingletonServiceImpl(
-        serverSet,
-        SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
deleted file mode 100644
index 9161455..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-
-import static java.util.Objects.requireNonNull;
-
-class CommonsServiceGroupMonitor implements ServiceGroupMonitor {
-  private Optional<Command> closeCommand = Optional.empty();
-  private final DynamicHostSet<ServiceInstance> serverSet;
-  private final AtomicReference<ImmutableSet<ServiceInstance>> services =
-      new AtomicReference<>(ImmutableSet.of());
-
-  @Inject
-  CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) {
-    this.serverSet = requireNonNull(serverSet);
-  }
-
-  @Override
-  public void start() throws MonitorException {
-    try {
-      closeCommand = Optional.of(serverSet.watch(services::set));
-    } catch (DynamicHostSet.MonitorException e) {
-      throw new MonitorException("Unable to watch scheduler host set.", e);
-    }
-  }
-
-  @Override
-  public void close() {
-    closeCommand.ifPresent(Command::execute);
-  }
-
-  @Override
-  public ImmutableSet<ServiceInstance> get() {
-    return services.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
index 40cda8c..77f90ee 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
@@ -37,7 +37,7 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.Encoding;
 import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.RetryPolicy;
@@ -76,7 +76,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
     requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
     requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
 
-    bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC);
+    bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(Encoding.JSON_CODEC);
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
index 1e7b9ce..48c7bfd 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
@@ -40,13 +40,6 @@ public final class FlaggedZooKeeperConfig {
 
   @Parameters(separators = "=")
   public static class Options {
-    @Parameter(names = "-zk_use_curator",
-        description =
-            "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter "
-                + "commons/zookeeper (the legacy library) is used.",
-        arity = 1)
-    public boolean useCurator = true;
-
     @Parameter(names = "-zk_in_proc",
         description =
             "Launches an embedded zookeeper server for local testing causing -zk_endpoints "
@@ -87,7 +80,6 @@ public final class FlaggedZooKeeperConfig {
    */
   public static ZooKeeperConfig create(Options opts) {
     return new ZooKeeperConfig(
-        opts.useCurator,
         opts.zkEndpoints,
         Optional.fromNullable(opts.chrootPath),
         opts.inProcess,

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
index 917a567..7e3b6c4 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
@@ -28,7 +28,6 @@ import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
-import com.google.inject.Module;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.binder.LinkedBindingBuilder;
@@ -85,15 +84,7 @@ public class ServiceDiscoveryModule extends AbstractModule {
       clusterBinder.toInstance(zooKeeperConfig.getServers());
     }
 
-    install(discoveryModule());
-  }
-
-  private Module discoveryModule() {
-    if (zooKeeperConfig.isUseCurator()) {
-      return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
-    } else {
-      return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
-    }
+    install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig));
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
index 433ed31..1a7e8cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
@@ -44,13 +44,11 @@ public class ZooKeeperConfig {
   /**
    * Creates a new client configuration with defaults for the session timeout and credentials.
    *
-   * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used.
    * @param servers ZooKeeper server addresses.
    * @return A new configuration.
    */
-  public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) {
+  public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) {
     return new ZooKeeperConfig(
-        useCurator,
         servers,
         Optional.absent(), // chrootPath
         false,
@@ -59,7 +57,6 @@ public class ZooKeeperConfig {
         Optional.absent()); // credentials
   }
 
-  private final boolean useCurator;
   private final Iterable<InetSocketAddress> servers;
   private final boolean inProcess;
   private final Amount<Integer, Time> sessionTimeout;
@@ -77,7 +74,6 @@ public class ZooKeeperConfig {
    * @param credentials ZooKeeper authentication credentials.
    */
   ZooKeeperConfig(
-      boolean useCurator,
       Iterable<InetSocketAddress> servers,
       Optional<String> chrootPath,
       boolean inProcess,
@@ -85,7 +81,6 @@ public class ZooKeeperConfig {
       Amount<Integer, Time> connectionTimeout,
       Optional<Credentials> credentials) {
 
-    this.useCurator = useCurator;
     this.servers = MorePreconditions.checkNotBlank(servers);
     this.chrootPath = requireNonNull(chrootPath);
     this.inProcess = inProcess;
@@ -103,7 +98,6 @@ public class ZooKeeperConfig {
    */
   public ZooKeeperConfig withCredentials(Credentials newCredentials) {
     return new ZooKeeperConfig(
-        useCurator,
         servers,
         chrootPath,
         inProcess,
@@ -112,10 +106,6 @@ public class ZooKeeperConfig {
         Optional.of(newCredentials));
   }
 
-  boolean isUseCurator() {
-    return useCurator;
-  }
-
   Iterable<InetSocketAddress> getServers() {
     return servers;
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index a363e70..8e3c1de 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.app;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Set;
@@ -47,9 +48,7 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.stats.Stats;
 import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ServerSetImpl;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
@@ -110,10 +109,9 @@ import static org.easymock.EasyMock.createControl;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class SchedulerIT extends BaseZooKeeperClientTest {
+public class SchedulerIT extends BaseZooKeeperTest {
 
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class);
 
@@ -153,7 +151,6 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
   private Stream logStream;
   private StreamMatcher streamMatcher;
   private EntrySerializer entrySerializer;
-  private ZooKeeperClient zkClient;
   private File backupDir;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -181,11 +178,9 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     entrySerializer = new EntrySerializer.EntrySerializerImpl(
         Amount.of(512, Data.KB),
         Hashing.md5());
-
-    zkClient = createZkClient();
   }
 
-  private void startScheduler() throws Exception {
+  private Injector startScheduler() throws Exception {
     // TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using
     // AppLauncher.
     Module testModule = new AbstractModule() {
@@ -215,8 +210,8 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     };
     ZooKeeperConfig zkClientConfig =
         ZooKeeperConfig.create(
-            true, // useCurator
-            ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort())))
+            ImmutableList.of(
+                InetSocketAddress.createUnresolved("localhost", getServer().getPort())))
             .withCredentials(Credentials.digestCredentials("mesos", "mesos"));
     SchedulerMain main = SchedulerMain.class.newInstance();
     Injector injector = Guice.createInjector(
@@ -245,21 +240,35 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     });
     injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class))
         .awaitHealthy();
+    return injector;
   }
 
-  private void awaitSchedulerReady() throws Exception {
+  private void awaitSchedulerReady(Injector injector) throws Exception {
     executor.submit(() -> {
-      ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
-      final CountDownLatch schedulerReady = new CountDownLatch(1);
-      schedulerService.watch(hostSet -> {
-        if (!hostSet.isEmpty()) {
-          schedulerReady.countDown();
+      ServiceGroupMonitor groupMonitor = injector.getInstance(ServiceGroupMonitor.class);
+      try {
+        // A timeout is used because certain types of assertion errors (mocks) will not surface
+        // until the main test thread exits this body of code.
+        long waited = 0;
+        while (waited < 5000) {
+          if (groupMonitor.get().isEmpty()) {
+            try {
+              Thread.sleep(100);
+              waited += 100;
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          } else {
+            break;
+          }
         }
-      });
-      // A timeout is used because certain types of assertion errors (mocks) will not surface
-      // until the main test thread exits this body of code.
-      assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
-      return null;
+      } finally {
+        try {
+          groupMonitor.close();
+        } catch (IOException e) {
+          LOG.info("Failed to close:" + e, e);
+        }
+      }
     }).get();
   }
 
@@ -345,7 +354,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     expect(driver.stop(true)).andReturn(Protos.Status.DRIVER_STOPPED).anyTimes();
 
     control.replay();
-    startScheduler();
+    Injector injector = startScheduler();
 
     driverStarted.await();
     scheduler.getValue().registered(
@@ -353,7 +362,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
         Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
         MASTER);
 
-    awaitSchedulerReady();
+    awaitSchedulerReady(injector);
 
     assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
     assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());