You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/10/19 00:34:06 UTC

[3/3] aurora git commit: Remove legacy commons ZK code

Remove legacy commons ZK code

Reviewed at https://reviews.apache.org/r/62652/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/15cb049f
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/15cb049f
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/15cb049f

Branch: refs/heads/master
Commit: 15cb049f3b5d1a3d662e8a396ce7020b107a2fe8
Parents: c638877
Author: Bill Farner <wf...@apache.org>
Authored: Wed Oct 18 17:33:50 2017 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Oct 18 17:33:50 2017 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   4 +-
 .../aurora/common/net/pool/DynamicHostSet.java  |  59 --
 .../aurora/common/zookeeper/Candidate.java      |  78 ---
 .../aurora/common/zookeeper/CandidateImpl.java  | 127 ----
 .../aurora/common/zookeeper/Encoding.java       |  87 +++
 .../apache/aurora/common/zookeeper/Group.java   | 674 -------------------
 .../aurora/common/zookeeper/ServerSet.java      |  74 --
 .../aurora/common/zookeeper/ServerSetImpl.java  | 349 ----------
 .../aurora/common/zookeeper/ServerSets.java     | 118 ----
 .../common/zookeeper/SingletonServiceImpl.java  | 122 ----
 .../common/zookeeper/ZooKeeperClient.java       | 372 ----------
 .../aurora/common/zookeeper/ZooKeeperUtils.java | 106 +--
 .../testing/BaseZooKeeperClientTest.java        | 140 ----
 .../zookeeper/testing/ZooKeeperTestServer.java  |   6 +-
 .../common/zookeeper/CandidateImplTest.java     | 165 -----
 .../aurora/common/zookeeper/EncodingTest.java   |  44 ++
 .../aurora/common/zookeeper/GroupTest.java      | 321 ---------
 .../aurora/common/zookeeper/JsonCodecTest.java  |  18 +-
 .../common/zookeeper/ServerSetImplTest.java     | 258 -------
 .../aurora/common/zookeeper/ServerSetsTest.java |  44 --
 .../zookeeper/SingletonServiceImplTest.java     | 243 -------
 .../common/zookeeper/ZooKeeperClientTest.java   | 210 ------
 .../common/zookeeper/ZooKeeperUtilsTest.java    |  76 +--
 .../CommonsServiceDiscoveryModule.java          | 102 ---
 .../discovery/CommonsServiceGroupMonitor.java   |  59 --
 .../CuratorServiceDiscoveryModule.java          |   4 +-
 .../discovery/FlaggedZooKeeperConfig.java       |   8 -
 .../discovery/ServiceDiscoveryModule.java       |  11 +-
 .../scheduler/discovery/ZooKeeperConfig.java    |  12 +-
 .../aurora/scheduler/app/SchedulerIT.java       |  57 +-
 .../scheduler/config/CommandLineTest.java       |   2 -
 .../discovery/AbstractDiscoveryModuleTest.java  |  82 ---
 .../discovery/BaseCuratorDiscoveryTest.java     |   6 +-
 .../discovery/CommonsDiscoveryModuleTest.java   |  29 -
 .../CommonsServiceGroupMonitorTest.java         | 137 ----
 .../discovery/CuratorDiscoveryModuleTest.java   |  63 +-
 .../discovery/ZooKeeperConfigTest.java          |   5 +-
 .../aurora/scheduler/http/LeaderHealthTest.java |   3 +-
 38 files changed, 237 insertions(+), 4038 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 079f495..f4cc416 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -18,8 +18,10 @@
 - Increase default ZooKeeper session timeout from 4 to 15 seconds.
 - Add option `-zk_connection_timeout` to control the connection timeout of ZooKeeper connections.
 
-### Deprecations and removals
+### Deprecations and removals:
 
+- Removed the deprecated command line argument `-zk_use_curator`, removing the choice to use the
+  legacy ZooKeeper client.
 - Removed the `rewriteConfigs` thrift API call in the scheduler. This was a last-ditch mechanism
   to modify scheduler state on the fly. It was considered extremely risky to use since its
   inception, and is safer to abandon due to its lack of use and likelihood for code rot.

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
deleted file mode 100644
index df469ef..0000000
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.Command;
-
-/**
- * A host set that can be monitored for changes.
- *
- * @param <T> The type that is used to identify members of the host set.
- */
-public interface DynamicHostSet<T> {
-
-  /**
-   * Registers a monitor to receive change notices for this server set as long as this jvm process
-   * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
-   * The monitor will be notified if the membership set or parameters of existing members have
-   * changed.
-   *
-   * @param monitor the server set monitor to call back when the host set changes
-   * @return A command which, when executed, will stop monitoring the host set.
-   * @throws MonitorException if there is a problem monitoring the host set
-   */
-  Command watch(HostChangeMonitor<T> monitor) throws MonitorException;
-
-  /**
-   * An interface to an object that is interested in receiving notification whenever the host set
-   * changes.
-   */
-  interface HostChangeMonitor<T> {
-
-    /**
-     * Called when either the available set of services changes (when a service dies or a new
-     * instance comes on-line) or when an existing service advertises a status or health change.
-     *
-     * @param hostSet the current set of available ServiceInstances
-     */
-    void onChange(ImmutableSet<T> hostSet);
-  }
-
-  class MonitorException extends Exception {
-    public MonitorException(String msg, Throwable cause) {
-      super(msg, cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
deleted file mode 100644
index 75c1b14..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Interface definition for becoming or querying for a ZooKeeper-based group leader.
- */
-public interface Candidate {
-
-  /**
-   * Returns the current group leader by querying ZooKeeper synchronously.
-   *
-   * @return the current group leader's identifying data or {@link Optional#absent()} if there is
-   *     no leader
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading the leader information
-   * @throws InterruptedException if this thread is interrupted getting the leader
-   */
-  public Optional<byte[]> getLeaderData()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException;
-
-  /**
-   * Encapsulates a leader that can be elected and subsequently defeated.
-   */
-  interface Leader {
-
-    /**
-     * Called when this leader has been elected.
-     *
-     * @param abdicate a command that can be used to abdicate leadership and force a new election
-     */
-    void onElected(ExceptionalCommand<JoinException> abdicate);
-
-    /**
-     * Called when the leader has been ousted.  Can occur either if the leader abdicates or if an
-     * external event causes the leader to lose its leadership role (session expiration).
-     */
-    void onDefeated();
-  }
-
-  /**
-   * Offers this candidate in leadership elections for as long as the current jvm process is alive.
-   * Upon election, the {@code onElected} callback will be executed and a command that can be used
-   * to abdicate leadership will be passed in.  If the elected leader jvm process dies or the
-   * elected leader successfully abdicates then a new leader will be elected.  Leaders that
-   * successfully abdicate are removed from the group and will not be eligible for leadership
-   * election unless {@link #offerLeadership(Leader)} is called again.
-   *
-   * @param leader the leader to notify of election and defeat events
-   * @throws JoinException if there was a problem joining the group
-   * @throws WatchException if there is a problem generating the 1st group membership list
-   * @throws InterruptedException if interrupted waiting to join the group and determine initial
-   *     election results
-   * @return a supplier that can be queried to find out if this leader is currently elected
-   */
-  public Supplier<Boolean> offerLeadership(Leader leader)
-        throws JoinException, WatchException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
deleted file mode 100644
index 98b5ee4..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements leader election for small groups of candidates.  This implementation is subject to the
- * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
- * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
- */
-public class CandidateImpl implements Candidate {
-  private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class);
-
-  private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
-
-  private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> {
-    try {
-      return InetAddress.getLocalHost().getHostAddress().getBytes();
-    } catch (UnknownHostException e) {
-      LOG.warn("Failed to determine local address!", e);
-      return UNKNOWN_CANDIDATE_DATA;
-    }
-  };
-
-  private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
-      candidates -> Ordering.natural().min(candidates);
-
-  private final Group group;
-
-  /**
-   * Creates a candidate that can be used to offer leadership for the given {@code group}.
-   */
-  public CandidateImpl(Group group) {
-    this.group = Preconditions.checkNotNull(group);
-  }
-
-  @Override
-  public Optional<byte[]> getLeaderData()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-
-    String leaderId = getLeader(group.getMemberIds());
-    return leaderId == null
-        ? Optional.<byte[]>absent()
-        : Optional.of(group.getMemberData(leaderId));
-  }
-
-  @Override
-  public Supplier<Boolean> offerLeadership(final Leader leader)
-      throws JoinException, WatchException, InterruptedException {
-
-    final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated);
-
-    final AtomicBoolean elected = new AtomicBoolean(false);
-    final AtomicBoolean abdicated = new AtomicBoolean(false);
-    group.watch(memberIds -> {
-      boolean noCandidates = Iterables.isEmpty(memberIds);
-      String memberId = membership.getMemberId();
-
-      if (noCandidates) {
-        LOG.warn("All candidates have temporarily left the group: " + group);
-      } else if (!Iterables.contains(memberIds, memberId)) {
-        LOG.error(
-            "Current member ID {} is not a candidate for leader, current voting: {}",
-            memberId, memberIds);
-      } else {
-        boolean electedLeader = memberId.equals(getLeader(memberIds));
-        boolean previouslyElected = elected.getAndSet(electedLeader);
-
-        if (!previouslyElected && electedLeader) {
-          LOG.info("Candidate {} is now leader of group: {}",
-              membership.getMemberPath(), memberIds);
-
-          leader.onElected(() -> {
-            membership.cancel();
-            abdicated.set(true);
-          });
-        } else if (!electedLeader) {
-          if (previouslyElected) {
-            leader.onDefeated();
-          }
-          LOG.info(
-              "Candidate {} waiting for the next leader election, current voting: {}",
-              membership.getMemberPath(), memberIds);
-        }
-      }
-    });
-
-    return () -> !abdicated.get() && elected.get();
-  }
-
-  @Nullable
-  private String getLeader(Iterable<String> memberIds) {
-    return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
new file mode 100644
index 0000000..204f5c4
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+/**
+ * Utility class for encoding and decoding data stored in ZooKeeper nodes.
+ */
+public class Encoding {
+  /**
+   * Encodes a {@link ServiceInstance} as a JSON object.
+   *
+   * This is the default encoding for service instance data in ZooKeeper.
+   */
+  public static final Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
+
+  private Encoding() {
+    // Utility class.
+  }
+
+  /**
+   * Returns a serialized Thrift service instance object, with given endpoints and codec.
+   *
+   * @param serviceInstance the Thrift service instance object to be serialized
+   * @param codec the codec to use to serialize a Thrift service instance object
+   * @return byte array that contains a serialized Thrift service instance
+   */
+  static byte[] serializeServiceInstance(
+      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    codec.serialize(serviceInstance, output);
+    return output.toByteArray();
+  }
+
+  /**
+   * Serializes a service instance based on endpoints.
+   * @see #serializeServiceInstance(ServiceInstance, Codec)
+   *
+   * @param address the target address of the service instance
+   * @param additionalEndpoints additional endpoints of the service instance
+   * @param status service status
+   */
+  static byte[] serializeServiceInstance(
+      InetSocketAddress address,
+      Map<String, Endpoint> additionalEndpoints,
+      Status status,
+      Codec<ServiceInstance> codec) throws IOException {
+
+    ServiceInstance serviceInstance = new ServiceInstance(
+        new Endpoint(address.getHostName(), address.getPort()), additionalEndpoints, status);
+    return serializeServiceInstance(serviceInstance, codec);
+  }
+
+  /**
+   * Creates a service instance object deserialized from byte array.
+   *
+   * @param data the byte array contains a serialized Thrift service instance
+   * @param codec the codec to use to deserialize the byte array
+   */
+  static ServiceInstance deserializeServiceInstance(
+      byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+    return codec.deserialize(new ByteArrayInputStream(data));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
deleted file mode 100644
index 2720dd1..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
+++ /dev/null
@@ -1,674 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.base.Commands;
-import org.apache.aurora.common.base.ExceptionalSupplier;
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class exposes methods for joining and monitoring distributed groups.  The groups this class
- * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
- * each member of a group.
- */
-public class Group {
-  private static final Logger LOG = LoggerFactory.getLogger(Group.class);
-
-  private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
-  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
-
-  private final ZooKeeperClient zkClient;
-  private final ImmutableList<ACL> acl;
-  private final String path;
-
-  private final NodeScheme nodeScheme;
-  private final Predicate<String> nodeNameFilter;
-
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a group rooted at the given {@code path}.  Paths must be absolute and trailing or
-   * duplicate slashes will be normalized.  For example, all the following paths would create a
-   * group at the normalized path /my/distributed/group:
-   * <ul>
-   *   <li>/my/distributed/group
-   *   <li>/my/distributed/group/
-   *   <li>/my/distributed//group
-   * </ul>
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it does not already exist
-   * @param path the absolute persistent path that represents this group
-   * @param nodeScheme the scheme that defines how nodes are created
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
-    this.zkClient = Preconditions.checkNotNull(zkClient);
-    this.acl = ImmutableList.copyOf(acl);
-    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
-
-    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
-    nodeNameFilter = Group.this.nodeScheme::isMember;
-
-    backoffHelper = new BackoffHelper();
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
-   * {@code namePrefix} of 'member_'.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
-   * {@link DefaultScheme} using {@code namePrefix}.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
-    this(zkClient, acl, path, new DefaultScheme(namePrefix));
-  }
-
-  public String getMemberPath(String memberId) {
-    return path + "/" + MorePreconditions.checkNotBlank(memberId);
-  }
-
-  public String getPath() {
-    return path;
-  }
-
-  public String getMemberId(String nodePath) {
-    MorePreconditions.checkNotBlank(nodePath);
-    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
-        "Not a member of this group[%s]: %s", path, nodePath);
-
-    String memberId = StringUtils.substringAfterLast(nodePath, "/");
-    Preconditions.checkArgument(nodeScheme.isMember(memberId),
-        "Not a group member: %s", memberId);
-    return memberId;
-  }
-
-  /**
-   * Returns the current list of group member ids by querying ZooKeeper synchronously.
-   *
-   * @return the ids of all the present members of this group
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading this group's member ids
-   * @throws InterruptedException if this thread is interrupted listing the group members
-   */
-  public Iterable<String> getMemberIds()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-    return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
-  }
-
-  /**
-   * Gets the data for one of this groups members by querying ZooKeeper synchronously.
-   *
-   * @param memberId the id of the member whose data to retrieve
-   * @return the data associated with the {@code memberId}
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading this member's data
-   * @throws InterruptedException if this thread is interrupted retrieving the member data
-   */
-  public byte[] getMemberData(String memberId)
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-    return zkClient.get().getData(getMemberPath(memberId), false, null);
-  }
-
-  /**
-   * Represents membership in a distributed group.
-   */
-  public interface Membership {
-
-    /**
-     * Returns the persistent ZooKeeper path that represents this group.
-     */
-    String getGroupPath();
-
-    /**
-     * Returns the id (ZooKeeper node name) of this group member.  May change over time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberId();
-
-    /**
-     * Returns the full ZooKeeper path to this group member.  May change over time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberPath();
-
-    /**
-     * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
-     * {@link Group#join()}.
-     *
-     * @return the new membership data
-     * @throws UpdateException if there was a problem updating the membership data
-     */
-    byte[] updateMemberData() throws UpdateException;
-
-    /**
-     * Cancels group membership by deleting the associated ZooKeeper member node.
-     *
-     * @throws JoinException if there is a problem deleting the node
-     */
-    void cancel() throws JoinException;
-  }
-
-  /**
-   * Indicates an error joining a group.
-   */
-  public static class JoinException extends Exception {
-    public JoinException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error updating a group member's data.
-   */
-  public static class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, null)}.
-   */
-  public final Membership join() throws JoinException, InterruptedException {
-    return join(NO_MEMBER_DATA, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(memberData, null)}.
-   */
-  public final Membership join(Supplier<byte[]> memberData)
-      throws JoinException, InterruptedException {
-
-    return join(memberData, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, onLoseMembership)}.
-   */
-  public final Membership join(@Nullable final Command onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    return join(NO_MEMBER_DATA, onLoseMembership);
-  }
-
-  /**
-   * Joins this group and returns the resulting Membership when successful.  Membership will be
-   * automatically cancelled when the current jvm process dies; however the returned Membership
-   * object can be used to cancel membership earlier.  Unless
-   * {@link Group.Membership#cancel()} is called the membership will
-   * be maintained by re-establishing it silently in the background.
-   *
-   * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper.  If an
-   * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
-   * membership in the group.
-   *
-   * @param memberData a supplier of the data to store in the member node
-   * @param onLoseMembership a callback to notify when membership is lost
-   * @return a Membership object with the member details
-   * @throws JoinException if there was a problem joining the group
-   * @throws InterruptedException if this thread is interrupted awaiting completion of the join
-   */
-  public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    Preconditions.checkNotNull(memberData);
-    ensurePersistentGroupPath();
-
-    final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
-    return backoffHelper.doUntilResult(() -> {
-      try {
-        return groupJoiner.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new JoinException("Interrupted trying to join group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Temporary error trying to join group at path: " + path, e);
-        return null;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error trying to join group at path: " + path, e);
-          return null;
-        } else {
-          throw new JoinException("Problem joining partition group at path: " + path, e);
-        }
-      }
-    });
-  }
-
-  private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
-    backoffHelper.doUntilSuccess(() -> {
-      try {
-        ZooKeeperUtils.ensurePath(zkClient, acl, path);
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-        return false;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error ensuring path: " + path, e);
-          return false;
-        } else {
-          throw new JoinException("Problem ensuring group at path: " + path, e);
-        }
-      }
-    });
-  }
-
-  private class ActiveMembership implements Membership {
-    private final Supplier<byte[]> memberData;
-    private final Command onLoseMembership;
-    private String nodePath;
-    private String memberId;
-    private volatile boolean cancelled;
-    private byte[] membershipData;
-
-    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
-      this.memberData = memberData;
-      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
-    }
-
-    @Override
-    public String getGroupPath() {
-      return path;
-    }
-
-    @Override
-    public synchronized String getMemberId() {
-      return memberId;
-    }
-
-    @Override
-    public synchronized String getMemberPath() {
-      return nodePath;
-    }
-
-    @Override
-    public synchronized byte[] updateMemberData() throws UpdateException {
-      byte[] membershipData = memberData.get();
-      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
-        try {
-          zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
-          this.membershipData = membershipData;
-        } catch (KeeperException e) {
-          throw new UpdateException("Problem updating membership data.", e);
-        } catch (InterruptedException e) {
-          throw new UpdateException("Interrupted attempting to update membership data.", e);
-        } catch (ZooKeeperConnectionException e) {
-          throw new UpdateException(
-              "Could not connect to the ZooKeeper cluster to update membership data.", e);
-        }
-      }
-      return membershipData;
-    }
-
-    @Override
-    public synchronized void cancel() throws JoinException {
-      if (!cancelled) {
-        try {
-          backoffHelper.doUntilSuccess(() -> {
-            try {
-              zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
-              return true;
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
-            } catch (ZooKeeperConnectionException e) {
-              LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-              return false;
-            } catch (NoNodeException e) {
-              LOG.info("Membership already cancelled, node at path: " + nodePath +
-                       " has been deleted");
-              return true;
-            } catch (KeeperException e) {
-              if (zkClient.shouldRetry(e)) {
-                LOG.warn("Temporary error cancelling membership: " + nodePath, e);
-                return false;
-              } else {
-                throw new JoinException("Problem cancelling membership: " + nodePath, e);
-              }
-            }
-          });
-          cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new JoinException("Problem cancelling membership: " + nodePath, e);
-        }
-      }
-    }
-
-    private class CancelledException extends IllegalStateException { /* marker */ }
-
-    synchronized Membership join()
-        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-
-      if (cancelled) {
-        throw new CancelledException();
-      }
-
-      if (nodePath == null) {
-        // Re-join if our ephemeral node goes away due to session expiry - only needs to be
-        // registered once.
-        zkClient.registerExpirationHandler(this::tryJoin);
-      }
-
-      byte[] membershipData = memberData.get();
-      String nodeName = nodeScheme.createName(membershipData);
-      CreateMode createMode = nodeScheme.isSequential()
-          ? CreateMode.EPHEMERAL_SEQUENTIAL
-          : CreateMode.EPHEMERAL;
-      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
-      memberId = Group.this.getMemberId(nodePath);
-      LOG.info("Set group member ID to " + memberId);
-      this.membershipData = membershipData;
-
-      // Re-join if our ephemeral node goes away due to maliciousness.
-      zkClient.get().exists(nodePath, event -> {
-        if (event.getType() == EventType.NodeDeleted) {
-          tryJoin();
-        }
-      });
-
-      return this;
-    }
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
-        () -> {
-          try {
-            join();
-            return true;
-          } catch (CancelledException e) {
-            // Lost a cancel race - that's ok.
-            return true;
-          } catch (ZooKeeperConnectionException e) {
-            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-            return false;
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error re-joining group: " + path, e);
-              return false;
-            } else {
-              throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
-            }
-          }
-        };
-
-    private synchronized void tryJoin() {
-      onLoseMembership.execute();
-      try {
-        backoffHelper.doUntilSuccess(tryJoin);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
-      }
-    }
-  }
-
-  /**
-   * An interface to an object that listens for changes to a group's membership.
-   */
-  public interface GroupChangeListener {
-
-    /**
-     * Called whenever group membership changes with the new list of member ids.
-     *
-     * @param memberIds the current member ids
-     */
-    void onGroupChange(Iterable<String> memberIds);
-  }
-
-  /**
-   * An interface that dictates the scheme to use for storing and filtering nodes that represent
-   * members of a distributed group.
-   */
-  public interface NodeScheme {
-    /**
-     * Determines if a child node is a member of a group by examining the node's name.
-     *
-     * @param nodeName the name of a child node found in a group
-     * @return {@code true} if {@code nodeName} identifies a group member in this scheme
-     */
-    boolean isMember(String nodeName);
-
-    /**
-     * Generates a node name for the node representing this process in the distributed group.
-     *
-     * @param membershipData the data that will be stored in this node
-     * @return the name for the node that will represent this process in the group
-     */
-    String createName(byte[] membershipData);
-
-    /**
-     * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
-     *
-     * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
-     */
-    boolean isSequential();
-  }
-
-  /**
-   * Indicates an error watching a group.
-   */
-  public static class WatchException extends Exception {
-    public WatchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Watches this group for the lifetime of this jvm process.  This method will block until the
-   * current group members are available, notify the {@code groupChangeListener} and then return.
-   * All further changes to the group membership will cause notifications on a background thread.
-   *
-   * @param groupChangeListener the listener to notify of group membership change events
-   * @return A command which, when executed, will stop watching the group.
-   * @throws WatchException if there is a problem generating the 1st group membership list
-   * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
-   */
-  public final Command watch(final GroupChangeListener groupChangeListener)
-      throws WatchException, InterruptedException {
-    Preconditions.checkNotNull(groupChangeListener);
-
-    try {
-      ensurePersistentGroupPath();
-    } catch (JoinException e) {
-      throw new WatchException("Failed to create group path: " + path, e);
-    }
-
-    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
-    backoffHelper.doUntilSuccess(() -> {
-      try {
-        groupMonitor.watchGroup();
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new WatchException("Interrupted trying to watch group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Temporary error trying to watch group at path: " + path, e);
-        return null;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error trying to watch group at path: " + path, e);
-          return null;
-        } else {
-          throw new WatchException("Problem trying to watch group at path: " + path, e);
-        }
-      }
-    });
-    return groupMonitor::stopWatching;
-  }
-
-  /**
-   * Helps continuously monitor a group for membership changes.
-   */
-  private class GroupMonitor {
-    private final GroupChangeListener groupChangeListener;
-    private volatile boolean stopped = false;
-    private Set<String> members;
-
-    GroupMonitor(GroupChangeListener groupChangeListener) {
-      this.groupChangeListener = groupChangeListener;
-    }
-
-    private final Watcher groupWatcher = event -> {
-      if (event.getType() == EventType.NodeChildrenChanged) {
-        tryWatchGroup();
-      }
-    };
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
-        () -> {
-          try {
-            watchGroup();
-            return true;
-          } catch (ZooKeeperConnectionException e) {
-            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-            return false;
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error re-watching group: " + path, e);
-              return false;
-            } else {
-              throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
-            }
-          }
-        };
-
-    private void tryWatchGroup() {
-      if (stopped) {
-        return;
-      }
-
-      try {
-        backoffHelper.doUntilSuccess(tryWatchGroup);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
-      }
-    }
-
-    private void watchGroup()
-        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-
-      if (stopped) {
-        return;
-      }
-
-      List<String> children = zkClient.get().getChildren(path, groupWatcher);
-      setMembers(Iterables.filter(children, nodeNameFilter));
-    }
-
-    private void stopWatching() {
-      // TODO(William Farner): Cancel the watch when
-      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
-      LOG.info("Stopping watch on " + this);
-      stopped = true;
-    }
-
-    synchronized void setMembers(Iterable<String> members) {
-      if (stopped) {
-        LOG.info("Suppressing membership update, no longer watching " + this);
-        return;
-      }
-
-      if (this.members == null) {
-        // Reset our watch on the group if session expires - only needs to be registered once.
-        zkClient.registerExpirationHandler(this::tryWatchGroup);
-      }
-
-      Set<String> membership = ImmutableSet.copyOf(members);
-      if (!membership.equals(this.members)) {
-        groupChangeListener.onGroupChange(members);
-        this.members = membership;
-      }
-    }
-  }
-
-  /**
-   * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
-   * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
-   * prefix is "member_", the node's full path will look something like
-   * {@code /discovery/servicename/member_0000000007}.
-   */
-  public static class DefaultScheme implements NodeScheme {
-    private final String namePrefix;
-    private final Pattern namePattern;
-
-    /**
-     * Creates a sequential node scheme based on the given node name prefix.
-     *
-     * @param namePrefix the prefix for the names of the member nodes
-     */
-    public DefaultScheme(String namePrefix) {
-      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
-      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
-    }
-
-    @Override
-    public boolean isMember(String nodeName) {
-      return namePattern.matcher(nodeName).matches();
-    }
-
-    @Override
-    public String createName(byte[] membershipData) {
-      return namePrefix;
-    }
-
-    @Override
-    public boolean isSequential() {
-      return true;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "Group " + path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
deleted file mode 100644
index aeea02d..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-
-/**
- * A logical set of servers registered in ZooKeeper.  Intended to be used by servers in a
- * common service to advertise their presence to server-set protocol-aware clients.
- *
- * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
- * rendezvous data to zookeeper so that standard clients can interoperate.
- */
-public interface ServerSet {
-
-  /**
-   * Encodes a {@link ServiceInstance} as a JSON object.
-   *
-   * This is the default encoding for service instance data in ZooKeeper.
-   */
-  Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
-
-  /**
-   * Attempts to join a server set for this logical service group.
-   *
-   * @param endpoint the primary service endpoint
-   * @param additionalEndpoints and additional endpoints keyed by their logical name
-   * @return an EndpointStatus object that allows the endpoint to adjust its status
-   * @throws JoinException if there was a problem joining the server set
-   * @throws InterruptedException if interrupted while waiting to join the server set
-   */
-  EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws JoinException, InterruptedException;
-
-  /**
-   * A handle to a service endpoint's status data that allows updating it to track current events.
-   */
-  interface EndpointStatus {
-
-    /**
-     * Removes the endpoint from the server set.
-     *
-     * @throws UpdateException if there was a problem leaving the ServerSet.
-     */
-    void leave() throws UpdateException;
-  }
-
-  /**
-   * Indicates an error updating a service's status information.
-   */
-  class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
deleted file mode 100644
index ace4980..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}.
- */
-public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> {
-  private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class);
-
-  private final ZooKeeperClient zkClient;
-  private final Group group;
-  private final Codec<ServiceInstance> codec;
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a new ServerSet using open ZooKeeper node ACLs.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
-    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
-  }
-
-  /**
-   * Creates a new ServerSet for the given service {@code path}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it does not already exist
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
-    this(zkClient, group, JSON_CODEC);
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
-   *     from a byte array
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
-    this.zkClient = checkNotNull(zkClient);
-    this.group = checkNotNull(group);
-    this.codec = checkNotNull(codec);
-
-    // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
-    backoffHelper = new BackoffHelper();
-  }
-
-  @VisibleForTesting
-  ZooKeeperClient getZkClient() {
-    return zkClient;
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws Group.JoinException, InterruptedException {
-
-    checkNotNull(endpoint);
-    checkNotNull(additionalEndpoints);
-
-    MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints);
-    Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance;
-    Group.Membership membership = group.join(serviceInstanceSupplier);
-
-    return () -> memberStatus.leave(membership);
-  }
-
-  @Override
-  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
-    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
-    try {
-      return serverSetWatcher.watch();
-    } catch (Group.WatchException e) {
-      throw new MonitorException("ZooKeeper watch failed.", e);
-    } catch (InterruptedException e) {
-      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
-    }
-  }
-
-  private class MemberStatus {
-    private final InetSocketAddress endpoint;
-    private final Map<String, InetSocketAddress> additionalEndpoints;
-
-    private MemberStatus(
-        InetSocketAddress endpoint,
-        Map<String, InetSocketAddress> additionalEndpoints) {
-
-      this.endpoint = endpoint;
-      this.additionalEndpoints = additionalEndpoints;
-    }
-
-    synchronized void leave(Group.Membership membership) throws UpdateException {
-      try {
-        membership.cancel();
-      } catch (Group.JoinException e) {
-        throw new UpdateException(
-            "Failed to auto-cancel group membership on transition to DEAD status", e);
-      }
-    }
-
-    byte[] serializeServiceInstance() {
-      ServiceInstance serviceInstance = new ServiceInstance(
-          ServerSets.toEndpoint(endpoint),
-          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
-          Status.ALIVE);
-
-      LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
-      try {
-        return ServerSets.serializeServiceInstance(serviceInstance, codec);
-      } catch (IOException e) {
-        throw new IllegalStateException("Unexpected problem serializing thrift struct " +
-            serviceInstance + "to a byte[]", e);
-      }
-    }
-  }
-
-  private static class ServiceInstanceFetchException extends RuntimeException {
-    ServiceInstanceFetchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  private static class ServiceInstanceDeletedException extends RuntimeException {
-    ServiceInstanceDeletedException(String path) {
-      super(path);
-    }
-  }
-
-  private class ServerSetWatcher {
-    private final ZooKeeperClient zkClient;
-    private final HostChangeMonitor<ServiceInstance> monitor;
-    @Nullable private ImmutableSet<ServiceInstance> serverSet;
-
-    ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
-      this.zkClient = zkClient;
-      this.monitor = monitor;
-    }
-
-    public Command watch() throws Group.WatchException, InterruptedException {
-      Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet);
-
-      try {
-        return group.watch(this::notifyGroupChange);
-      } catch (Group.WatchException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      } catch (InterruptedException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      }
-    }
-
-    private ServiceInstance getServiceInstance(final String nodePath) {
-      try {
-        return backoffHelper.doUntilResult(() -> {
-          try {
-            byte[] data = zkClient.get().getData(nodePath, false, null);
-            return ServerSets.deserializeServiceInstance(data, codec);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new ServiceInstanceFetchException(
-                "Interrupted updating service data for: " + nodePath, e);
-          } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            LOG.warn("Temporary error trying to updating service data for: " + nodePath, e);
-            return null;
-          } catch (NoNodeException e) {
-            invalidateNodePath(nodePath);
-            throw new ServiceInstanceDeletedException(nodePath);
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error trying to update service data for: " + nodePath, e);
-              return null;
-            } else {
-              throw new ServiceInstanceFetchException(
-                  "Failed to update service data for: " + nodePath, e);
-            }
-          } catch (IOException e) {
-            throw new ServiceInstanceFetchException(
-                "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
-          }
-        });
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ServiceInstanceFetchException(
-            "Interrupted trying to update service data for: " + nodePath, e);
-      }
-    }
-
-    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
-        CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
-          @Override public ServiceInstance load(String memberId) {
-            return getServiceInstance(group.getMemberPath(memberId));
-          }
-        });
-
-    private void rebuildServerSet() {
-      Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
-      servicesByMemberId.invalidateAll();
-      notifyGroupChange(memberIds);
-    }
-
-    private String invalidateNodePath(String deletedPath) {
-      String memberId = group.getMemberId(deletedPath);
-      servicesByMemberId.invalidate(memberId);
-      return memberId;
-    }
-
-    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
-        memberId -> {
-          // This get will trigger a fetch
-          try {
-            return servicesByMemberId.getUnchecked(memberId);
-          } catch (UncheckedExecutionException e) {
-            Throwable cause = e.getCause();
-            if (!(cause instanceof ServiceInstanceDeletedException)) {
-              Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
-              throw new IllegalStateException(
-                  "Unexpected error fetching member data for: " + memberId, e);
-            }
-            return null;
-          }
-        };
-
-    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
-      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
-      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
-
-      // Ignore no-op state changes except for the 1st when we've seen no group yet.
-      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
-        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
-        // Implicit removal from servicesByMemberId.
-        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
-
-        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
-            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
-
-        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
-      }
-    }
-
-    private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
-      // ZK nodes may have changed if there was a session expiry for a server in the server set, but
-      // if the server's status has not changed, we can skip any onChange updates.
-      if (!currentServerSet.equals(serverSet)) {
-        if (currentServerSet.isEmpty()) {
-          LOG.warn("server set empty for path " + group.getPath());
-        } else {
-          if (serverSet == null) {
-            LOG.info("received initial membership {}", currentServerSet);
-          } else {
-            logChange(currentServerSet);
-          }
-        }
-        serverSet = currentServerSet;
-        monitor.onChange(serverSet);
-      }
-    }
-
-    private void logChange(ImmutableSet<ServiceInstance> newServerSet) {
-      StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
-      if (serverSet.size() != newServerSet.size()) {
-        message.append("from ").append(serverSet.size())
-            .append(" members to ").append(newServerSet.size());
-      }
-
-      Joiner joiner = Joiner.on("\n\t\t");
-
-      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
-      if (!left.isEmpty()) {
-        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
-      }
-
-      SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
-      if (!joined.isEmpty()) {
-        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
-      }
-
-      LOG.info(message.toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
deleted file mode 100644
index 01a54a5..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * Common ServerSet related functions
- */
-public class ServerSets {
-
-  private ServerSets() {
-    // Utility class.
-  }
-
-  /**
-   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
-   */
-  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
-      ServerSets::toEndpoint;
-
-  /**
-   * Creates a server set that registers at a single path applying the given ACL to all nodes
-   * created in the path.
-   *
-   * @param zkClient ZooKeeper client to register with.
-   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
-   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, java.util.Set)
-   * @return A server set that registers at {@code zkPath}.
-   */
-  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
-    Preconditions.checkNotNull(zkClient);
-    MorePreconditions.checkNotBlank(acl);
-    MorePreconditions.checkNotBlank(zkPath);
-
-    return new ServerSetImpl(zkClient, acl, zkPath);
-  }
-
-  /**
-   * Returns a serialized Thrift service instance object, with given endpoints and codec.
-   *
-   * @param serviceInstance the Thrift service instance object to be serialized
-   * @param codec the codec to use to serialize a Thrift service instance object
-   * @return byte array that contains a serialized Thrift service instance
-   */
-  public static byte[] serializeServiceInstance(
-      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
-
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    codec.serialize(serviceInstance, output);
-    return output.toByteArray();
-  }
-
-  /**
-   * Serializes a service instance based on endpoints.
-   * @see #serializeServiceInstance(ServiceInstance, Codec)
-   *
-   * @param address the target address of the service instance
-   * @param additionalEndpoints additional endpoints of the service instance
-   * @param status service status
-   */
-  public static byte[] serializeServiceInstance(
-      InetSocketAddress address,
-      Map<String, Endpoint> additionalEndpoints,
-      Status status,
-      Codec<ServiceInstance> codec) throws IOException {
-
-    ServiceInstance serviceInstance =
-        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
-    return serializeServiceInstance(serviceInstance, codec);
-  }
-
-  /**
-   * Creates a service instance object deserialized from byte array.
-   *
-   * @param data the byte array contains a serialized Thrift service instance
-   * @param codec the codec to use to deserialize the byte array
-   */
-  public static ServiceInstance deserializeServiceInstance(
-      byte[] data, Codec<ServiceInstance> codec) throws IOException {
-
-    return codec.deserialize(new ByteArrayInputStream(data));
-  }
-
-  /**
-   * Creates an endpoint for the given InetSocketAddress.
-   *
-   * @param address the target address to create the endpoint for
-   */
-  public static Endpoint toEndpoint(InetSocketAddress address) {
-    return new Endpoint(address.getHostName(), address.getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
deleted file mode 100644
index d9978a9..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.zookeeper.data.ACL;
-
-public class SingletonServiceImpl implements SingletonService {
-  @VisibleForTesting
-  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
-
-  /**
-   * Creates a candidate that can be combined with an existing server set to form a singleton
-   * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
-   *
-   * @param zkClient The ZooKeeper client to use.
-   * @param servicePath The path where service nodes live.
-   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
-   * @return A candidate that can be housed with a standard server set under a single zk path.
-   */
-  public static Candidate createSingletonCandidate(
-      ZooKeeperClient zkClient,
-      String servicePath,
-      Iterable<ACL> acl) {
-
-    return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
-  }
-
-  private final ServerSet serverSet;
-  private final Candidate candidate;
-
-  /**
-   * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
-   * advertises itself in the given server set once elected.
-   *
-   * @param serverSet The server set to advertise in on election.
-   * @param candidate The candidacy to use to vie for election.
-   */
-  public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
-    this.serverSet = Preconditions.checkNotNull(serverSet);
-    this.candidate = Preconditions.checkNotNull(candidate);
-  }
-
-  @Override
-  public void lead(final InetSocketAddress endpoint,
-                   final Map<String, InetSocketAddress> additionalEndpoints,
-                   final LeadershipListener listener)
-                   throws LeadException, InterruptedException {
-
-    Preconditions.checkNotNull(listener);
-
-    try {
-      candidate.offerLeadership(new Leader() {
-        @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
-          listener.onLeading(new LeaderControl() {
-            ServerSet.EndpointStatus endpointStatus = null;
-            final AtomicBoolean left = new AtomicBoolean(false);
-
-            // Methods are synchronized to prevent simultaneous invocations.
-            @Override public synchronized void advertise()
-                throws AdvertiseException, InterruptedException {
-
-              Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
-              Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
-              try {
-                endpointStatus = serverSet.join(endpoint, additionalEndpoints);
-              } catch (JoinException e) {
-                throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
-              }
-            }
-
-            @Override public synchronized void leave() throws LeaveException {
-              Preconditions.checkState(left.compareAndSet(false, true),
-                  "Cannot leave more than once.");
-              if (endpointStatus != null) {
-                try {
-                  endpointStatus.leave();
-                } catch (ServerSet.UpdateException e) {
-                  throw new LeaveException("Problem updating endpoint status for abdicating leader " +
-                      "at endpoint " + endpoint, e);
-                }
-              }
-              try {
-                abdicate.execute();
-              } catch (JoinException e) {
-                throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
-              }
-            }
-          });
-        }
-
-        @Override public void onDefeated() {
-          listener.onDefeated();
-        }
-      });
-    } catch (JoinException e) {
-      throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
-    } catch (Group.WatchException e) {
-      throw new LeadException("Problem getting initial membership list for leadership group.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
deleted file mode 100644
index ce243fb..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.InetSocketAddressHelper;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages a connection to a ZooKeeper cluster.
- */
-public class ZooKeeperClient {
-
-  /**
-   * Indicates an error connecting to a zookeeper cluster.
-   */
-  public class ZooKeeperConnectionException extends Exception {
-    ZooKeeperConnectionException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  private final class SessionState {
-    private final long sessionId;
-    private final byte[] sessionPasswd;
-
-    private SessionState(long sessionId, byte[] sessionPasswd) {
-      this.sessionId = sessionId;
-      this.sessionPasswd = sessionPasswd;
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class);
-
-  private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS);
-
-  private final int sessionTimeoutMs;
-  private final Optional<Credentials> credentials;
-  private final String zooKeeperServers;
-  // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
-  // made from within long synchronized blocks.
-  private volatile ZooKeeper zooKeeper;
-  private SessionState sessionState;
-
-  private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
-  private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>();
-
-  private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
-      InetSocketAddress... addresses) {
-    return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get()}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param zooKeeperServer the first, required ZK server
-   * @param zooKeeperServers any additional servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer,
-      InetSocketAddress... zooKeeperServers) {
-    this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
-      Iterable<InetSocketAddress> zooKeeperServers) {
-    this(sessionTimeout, Optional.absent(), Optional.absent(), zooKeeperServers);
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get()}.  All successful connections will be authenticated with the given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param zooKeeperServer the first, required ZK server
-   * @param zooKeeperServers any additional servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
-      InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) {
-    this(sessionTimeout,
-        Optional.of(credentials),
-        Optional.absent(),
-        combine(zooKeeperServer, zooKeeperServers));
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get}.  All successful connections will be authenticated with the given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
-      Iterable<InetSocketAddress> zooKeeperServers) {
-        this(sessionTimeout,
-            Optional.of(credentials),
-            Optional.absent(),
-            zooKeeperServers);
-      }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the first call to
-   * {@link #get}.  All successful connections will be authenticated with the given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param chrootPath an optional chroot path
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials,
-      Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) {
-    this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
-    this.credentials = Preconditions.checkNotNull(credentials);
-
-    if (chrootPath.isPresent()) {
-      PathUtils.validatePath(chrootPath.get());
-    }
-
-    Preconditions.checkNotNull(zooKeeperServers);
-    Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
-        "Must present at least 1 ZK server");
-
-    Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
-      @Override
-      public void run() {
-        while (true) {
-          try {
-            WatchedEvent event = eventQueue.take();
-            for (Watcher watcher : watchers) {
-              watcher.process(event);
-            }
-          } catch (InterruptedException e) { /* ignore */ }
-        }
-      }
-    };
-    watcherProcessor.setDaemon(true);
-    watcherProcessor.start();
-
-    Iterable<String> servers =
-        Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
-            InetSocketAddressHelper::toString);
-    this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or(""));
-  }
-
-  /**
-   * Returns the current active ZK connection or establishes a new one if none has yet been
-   * established or a previous connection was disconnected or had its session time out.  This method
-   * will attempt to re-use sessions when possible.  Equivalent to:
-   * <pre>get(Amount.of(0L, ...)</pre>.
-   *
-   * @return a connected ZooKeeper client
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
-   * @throws InterruptedException if interrupted while waiting for a connection to be established
-   */
-  public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException {
-    try {
-      return get(WAIT_FOREVER);
-    } catch (TimeoutException e) {
-      InterruptedException interruptedException =
-          new InterruptedException("Got an unexpected TimeoutException for 0 wait");
-      interruptedException.initCause(e);
-      throw interruptedException;
-    }
-  }
-
-  /**
-   * Returns the current active ZK connection or establishes a new one if none has yet been
-   * established or a previous connection was disconnected or had its session time out.  This
-   * method will attempt to re-use sessions when possible.
-   *
-   * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK
-   *     cluster to be established; 0 to wait forever
-   * @return a connected ZooKeeper client
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
-   * @throws InterruptedException if interrupted while waiting for a connection to be established
-   * @throws TimeoutException if a connection could not be established within the configured
-   *     session timeout
-   */
-  public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
-      throws ZooKeeperConnectionException, InterruptedException, TimeoutException {
-
-    if (zooKeeper == null) {
-      final CountDownLatch connected = new CountDownLatch(1);
-      Watcher watcher = event -> {
-        switch (event.getType()) {
-          // Guard the None type since this watch may be used as the default watch on calls by
-          // the client outside our control.
-          case None:
-            switch (event.getState()) {
-              case Expired:
-                LOG.info("Zookeeper session expired. Event: " + event);
-                close();
-                break;
-              case SyncConnected:
-                connected.countDown();
-                break;
-            }
-        }
-
-        eventQueue.offer(event);
-      };
-
-      try {
-        zooKeeper = (sessionState != null)
-          ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId,
-            sessionState.sessionPasswd)
-          : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
-      } catch (IOException e) {
-        throw new ZooKeeperConnectionException(
-            "Problem connecting to servers: " + zooKeeperServers, e);
-      }
-
-      if (connectionTimeout.getValue() > 0) {
-        if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
-          close();
-          throw new TimeoutException("Timed out waiting for a ZK connection after "
-                                     + connectionTimeout);
-        }
-      } else {
-        try {
-          connected.await();
-        } catch (InterruptedException ex) {
-          LOG.info("Interrupted while waiting to connect to zooKeeper");
-          close();
-          throw ex;
-        }
-      }
-      if (credentials.isPresent()) {
-        Credentials zkCredentials = credentials.get();
-        zooKeeper.addAuthInfo(zkCredentials.scheme(), zkCredentials.authToken());
-      }
-
-      sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
-    }
-    return zooKeeper;
-  }
-
-  /**
-   * Clients that need to re-establish state after session expiration can register an
-   * {@code onExpired} command to execute.
-   *
-   * @param onExpired the {@code Command} to register
-   * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
-   *     removal.
-   */
-  public Watcher registerExpirationHandler(final Command onExpired) {
-    Watcher watcher = event -> {
-      if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
-        onExpired.execute();
-      }
-    };
-    register(watcher);
-    return watcher;
-  }
-
-  /**
-   * Clients that need to register a top-level {@code Watcher} should do so using this method.  The
-   * registered {@code watcher} will remain registered across re-connects and session expiration
-   * events.
-   *
-   * @param watcher the {@code Watcher to register}
-   */
-  public void register(Watcher watcher) {
-    watchers.add(watcher);
-  }
-
-  /**
-   * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
-   * registered.
-   *
-   * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
-   * @return whether the given {@code Watcher} was found and removed from the active set
-   */
-  public boolean unregister(Watcher watcher) {
-    return watchers.remove(watcher);
-  }
-
-  /**
-   * Checks to see if the client might reasonably re-try an operation given the exception thrown
-   * while attempting it.  If the ZooKeeper session should be expired to enable the re-try to
-   * succeed this method will expire it as a side-effect.
-   *
-   * @param e the exception to test
-   * @return true if a retry can be attempted
-   */
-  public boolean shouldRetry(KeeperException e) {
-    if (e instanceof SessionExpiredException) {
-      close();
-    }
-    return ZooKeeperUtils.isRetryable(e);
-  }
-
-  /**
-   * Closes the current connection if any expiring the current ZooKeeper session.  Any subsequent
-   * calls to this method will no-op until the next successful {@link #get}.
-   */
-  public synchronized void close() {
-    if (zooKeeper != null) {
-      try {
-        zooKeeper.close();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.warn("Interrupted trying to close zooKeeper");
-      } finally {
-        zooKeeper = null;
-        sessionState = null;
-      }
-    }
-  }
-
-  @VisibleForTesting
-  synchronized boolean isClosed() {
-    return zooKeeper == null;
-  }
-
-  @VisibleForTesting
-  ZooKeeper getZooKeeperClientForTests() {
-    return zooKeeper;
-  }
-}