You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2016/12/01 17:02:33 UTC

[4/4] aurora git commit: Revert removal of twitter/commons/zk based leadership code

Revert removal of twitter/commons/zk based leadership code

See discussion here: https://issues.apache.org/jira/browse/AURORA-1840

Reviewed at https://reviews.apache.org/r/54250/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/16e4651d
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/16e4651d
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/16e4651d

Branch: refs/heads/master
Commit: 16e4651d5ff038dad0e9977edea7c57aeb37fe12
Parents: 8bcad84
Author: David McLaughlin <da...@dmclaughlin.com>
Authored: Thu Dec 1 09:01:33 2016 -0800
Committer: David McLaughlin <dm...@twitter.com>
Committed: Thu Dec 1 09:01:33 2016 -0800

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   1 +
 build.gradle                                    |  16 +-
 .../aurora/common/zookeeper/Candidate.java      |  78 +++
 .../aurora/common/zookeeper/CandidateImpl.java  | 127 ++++
 .../aurora/common/zookeeper/Credentials.java    |  90 +++
 .../apache/aurora/common/zookeeper/Group.java   | 674 +++++++++++++++++++
 .../aurora/common/zookeeper/JsonCodec.java      | 139 ++++
 .../aurora/common/zookeeper/ServerSet.java      |  74 ++
 .../aurora/common/zookeeper/ServerSetImpl.java  | 349 ++++++++++
 .../aurora/common/zookeeper/ServerSets.java     | 118 ++++
 .../common/zookeeper/SingletonService.java      | 114 ++++
 .../common/zookeeper/SingletonServiceImpl.java  | 122 ++++
 .../common/zookeeper/ZooKeeperClient.java       | 372 ++++++++++
 .../aurora/common/zookeeper/ZooKeeperUtils.java | 167 +++++
 .../testing/BaseZooKeeperClientTest.java        | 140 ++++
 .../zookeeper/testing/BaseZooKeeperTest.java    |  46 ++
 .../zookeeper/testing/ZooKeeperTestServer.java  | 121 ++++
 .../common/zookeeper/CandidateImplTest.java     | 165 +++++
 .../aurora/common/zookeeper/GroupTest.java      | 321 +++++++++
 .../aurora/common/zookeeper/JsonCodecTest.java  | 151 +++++
 .../common/zookeeper/ServerSetImplTest.java     | 258 +++++++
 .../aurora/common/zookeeper/ServerSetsTest.java |  44 ++
 .../zookeeper/SingletonServiceImplTest.java     | 243 +++++++
 .../common/zookeeper/ZooKeeperClientTest.java   | 210 ++++++
 .../common/zookeeper/ZooKeeperUtilsTest.java    | 139 ++++
 config/findbugs/excludeFilter.xml               |   8 -
 docs/features/service-discovery.md              |   2 +-
 docs/reference/scheduler-configuration.md       |   2 +
 .../aurora/scheduler/SchedulerLifecycle.java    |   6 +-
 .../aurora/scheduler/app/SchedulerMain.java     |   4 +-
 .../scheduler/app/ServiceGroupMonitor.java      |  46 ++
 .../CommonsServiceDiscoveryModule.java          | 102 +++
 .../discovery/CommonsServiceGroupMonitor.java   |  59 ++
 .../aurora/scheduler/discovery/Credentials.java |  98 ---
 .../CuratorServiceDiscoveryModule.java          |   8 +-
 .../discovery/CuratorServiceGroupMonitor.java   |   1 +
 .../discovery/CuratorSingletonService.java      |   1 +
 .../discovery/FlaggedZooKeeperConfig.java       |  21 +-
 .../aurora/scheduler/discovery/JsonCodec.java   | 147 ----
 .../discovery/ServiceDiscoveryModule.java       |  20 +-
 .../discovery/ServiceGroupMonitor.java          |  46 --
 .../scheduler/discovery/SingletonService.java   | 114 ----
 .../scheduler/discovery/ZooKeeperConfig.java    |  21 +-
 .../scheduler/discovery/ZooKeeperUtils.java     |  51 --
 .../discovery/testing/BaseZooKeeperTest.java    |  53 --
 .../discovery/testing/ZooKeeperTestServer.java  | 101 ---
 .../scheduler/http/JettyServerModule.java       |   2 +-
 .../aurora/scheduler/http/LeaderRedirect.java   |   4 +-
 .../log/mesos/MesosLogStreamModule.java         |   4 +-
 .../scheduler/SchedulerLifecycleTest.java       |   4 +-
 .../aurora/scheduler/app/SchedulerIT.java       |  52 +-
 .../discovery/AbstractDiscoveryModuleTest.java  |  77 +++
 .../discovery/BaseCuratorDiscoveryTest.java     |  10 +-
 .../discovery/CommonsDiscoveryModuleTest.java   |  29 +
 .../CommonsServiceGroupMonitorTest.java         | 137 ++++
 .../discovery/CuratorDiscoveryModuleTest.java   |  64 +-
 .../discovery/CuratorSingletonServiceTest.java  |   3 +-
 .../scheduler/discovery/JsonCodecTest.java      | 159 -----
 .../discovery/ZooKeeperConfigTest.java          |  17 +-
 .../scheduler/http/AbstractJettyTest.java       |  15 +-
 .../scheduler/http/LeaderRedirectTest.java      |   4 +-
 .../aurora/scheduler/thrift/ThriftIT.java       |   2 +-
 62 files changed, 4871 insertions(+), 902 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 96926f4..7a3d331 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -26,6 +26,7 @@
 - The scheduler flag `-zk_use_curator` has been removed. If you have never set the flag and are
   upgrading you should take care as described in the [note](#zk_use_curator_upgrade) below.
 
+=======
 0.16.0
 ======
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index f257440..2f23b85 100644
--- a/build.gradle
+++ b/build.gradle
@@ -164,7 +164,6 @@ project(':commons') {
   dependencies {
     compile project(':commons-args')
 
-    compile "ch.qos.logback:logback-classic:${logbackRev}"
     compile "com.google.code.findbugs:jsr305:${jsrRev}"
     compile "com.google.code.gson:gson:${gsonRev}"
     compile "com.google.guava:guava:${guavaRev}"
@@ -175,13 +174,17 @@ project(':commons') {
     compile "javax.servlet:javax.servlet-api:${servletRev}"
     compile "joda-time:joda-time:2.9.1"
     compile "org.antlr:stringtemplate:${stringTemplateRev}"
+    compile "org.apache.zookeeper:zookeeper:${zookeeperRev}"
     compile "org.easymock:easymock:3.4"
 
     // There are a few testing support libs in the src/main/java trees that use junit - currently:
+    //   src/main/java/org/apache/aurora/common/zookeeper/testing
     //   src/main/java/org/apache/aurora/common/testing
     compile "junit:junit:${junitRev}"
 
     testCompile "junit:junit:${junitRev}"
+    testCompile "org.powermock:powermock-module-junit4:1.6.4"
+    testCompile "org.powermock:powermock-api-easymock:1.6.4"
   }
 }
 
@@ -347,11 +350,9 @@ dependencies {
   compile project(':commons')
   compile project(':commons-args')
 
-
   compile 'aopalliance:aopalliance:1.0'
-  compile "ch.qos.logback:logback-classic:${logbackRev}"
+  compile 'ch.qos.logback:logback-classic:1.1.3'
   compile "com.google.code.findbugs:jsr305:${jsrRev}"
-  compile "com.google.code.gson:gson:${gsonRev}"
   compile "com.google.inject:guice:${guiceRev}"
   compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}"
   compile "com.google.protobuf:protobuf-java:${protobufRev}"
@@ -385,15 +386,8 @@ dependencies {
   compile 'org.quartz-scheduler:quartz:2.2.2'
   compile "uno.perk:forward:1.0.0"
 
-  // There are a few testing support libs in the src/main/java trees that use junit - currently:
-  //   src/main/java/org/apache/aurora/common/zookeeper/testing
-  compile "junit:junit:${junitRev}"
-
   testCompile "com.sun.jersey:jersey-client:${jerseyRev}"
   testCompile "junit:junit:${junitRev}"
-  testCompile "org.powermock:powermock-module-junit4:1.6.4"
-  testCompile "org.powermock:powermock-api-easymock:1.6.4"
-
 }
 
 // For normal developer builds, avoid running the often-time-consuming code quality checks.

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
new file mode 100644
index 0000000..75c1b14
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Interface definition for becoming or querying for a ZooKeeper-based group leader.
+ */
+public interface Candidate {
+
+  /**
+   * Returns the current group leader by querying ZooKeeper synchronously.
+   *
+   * @return the current group leader's identifying data or {@link Optional#absent()} if there is
+   *     no leader
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading the leader information
+   * @throws InterruptedException if this thread is interrupted getting the leader
+   */
+  public Optional<byte[]> getLeaderData()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException;
+
+  /**
+   * Encapsulates a leader that can be elected and subsequently defeated.
+   */
+  interface Leader {
+
+    /**
+     * Called when this leader has been elected.
+     *
+     * @param abdicate a command that can be used to abdicate leadership and force a new election
+     */
+    void onElected(ExceptionalCommand<JoinException> abdicate);
+
+    /**
+     * Called when the leader has been ousted.  Can occur either if the leader abdicates or if an
+     * external event causes the leader to lose its leadership role (session expiration).
+     */
+    void onDefeated();
+  }
+
+  /**
+   * Offers this candidate in leadership elections for as long as the current jvm process is alive.
+   * Upon election, the {@code onElected} callback will be executed and a command that can be used
+   * to abdicate leadership will be passed in.  If the elected leader jvm process dies or the
+   * elected leader successfully abdicates then a new leader will be elected.  Leaders that
+   * successfully abdicate are removed from the group and will not be eligible for leadership
+   * election unless {@link #offerLeadership(Leader)} is called again.
+   *
+   * @param leader the leader to notify of election and defeat events
+   * @throws JoinException if there was a problem joining the group
+   * @throws WatchException if there is a problem generating the 1st group membership list
+   * @throws InterruptedException if interrupted waiting to join the group and determine initial
+   *     election results
+   * @return a supplier that can be queried to find out if this leader is currently elected
+   */
+  public Supplier<Boolean> offerLeadership(Leader leader)
+        throws JoinException, WatchException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
new file mode 100644
index 0000000..98b5ee4
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements leader election for small groups of candidates.  This implementation is subject to the
+ * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
+ * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
+ */
+public class CandidateImpl implements Candidate {
+  private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class);
+
+  private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
+
+  private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> {
+    try {
+      return InetAddress.getLocalHost().getHostAddress().getBytes();
+    } catch (UnknownHostException e) {
+      LOG.warn("Failed to determine local address!", e);
+      return UNKNOWN_CANDIDATE_DATA;
+    }
+  };
+
+  private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
+      candidates -> Ordering.natural().min(candidates);
+
+  private final Group group;
+
+  /**
+   * Creates a candidate that can be used to offer leadership for the given {@code group}.
+   */
+  public CandidateImpl(Group group) {
+    this.group = Preconditions.checkNotNull(group);
+  }
+
+  @Override
+  public Optional<byte[]> getLeaderData()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+
+    String leaderId = getLeader(group.getMemberIds());
+    return leaderId == null
+        ? Optional.<byte[]>absent()
+        : Optional.of(group.getMemberData(leaderId));
+  }
+
+  @Override
+  public Supplier<Boolean> offerLeadership(final Leader leader)
+      throws JoinException, WatchException, InterruptedException {
+
+    final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated);
+
+    final AtomicBoolean elected = new AtomicBoolean(false);
+    final AtomicBoolean abdicated = new AtomicBoolean(false);
+    group.watch(memberIds -> {
+      boolean noCandidates = Iterables.isEmpty(memberIds);
+      String memberId = membership.getMemberId();
+
+      if (noCandidates) {
+        LOG.warn("All candidates have temporarily left the group: " + group);
+      } else if (!Iterables.contains(memberIds, memberId)) {
+        LOG.error(
+            "Current member ID {} is not a candidate for leader, current voting: {}",
+            memberId, memberIds);
+      } else {
+        boolean electedLeader = memberId.equals(getLeader(memberIds));
+        boolean previouslyElected = elected.getAndSet(electedLeader);
+
+        if (!previouslyElected && electedLeader) {
+          LOG.info("Candidate {} is now leader of group: {}",
+              membership.getMemberPath(), memberIds);
+
+          leader.onElected(() -> {
+            membership.cancel();
+            abdicated.set(true);
+          });
+        } else if (!electedLeader) {
+          if (previouslyElected) {
+            leader.onDefeated();
+          }
+          LOG.info(
+              "Candidate {} waiting for the next leader election, current voting: {}",
+              membership.getMemberPath(), memberIds);
+        }
+      }
+    });
+
+    return () -> !abdicated.get() && elected.get();
+  }
+
+  @Nullable
+  private String getLeader(Iterable<String> memberIds) {
+    return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
new file mode 100644
index 0000000..18319a3
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.commons.lang.builder.EqualsBuilder;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Encapsulates a user's ZooKeeper credentials.
+ */
+public final class Credentials {
+
+  /**
+   * Creates a set of credentials for the ZooKeeper digest authentication mechanism.
+   *
+   * @param username the username to authenticate with
+   * @param password the password to authenticate with
+   * @return a set of credentials that can be used to authenticate the zoo keeper client
+   */
+  public static Credentials digestCredentials(String username, String password) {
+    MorePreconditions.checkNotBlank(username);
+    Preconditions.checkNotNull(password);
+
+    // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
+    // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
+    // Consider writing and installing a version of DigestAuthenticationProvider that controls its
+    // Charset explicitly.
+    return new Credentials("digest", (username + ":" + password).getBytes());
+  }
+
+  private final String scheme;
+  private final byte[] authToken;
+
+  public Credentials(String scheme, byte[] authToken) {
+    this.scheme = MorePreconditions.checkNotBlank(scheme);
+    this.authToken = requireNonNull(authToken);
+  }
+
+  /**
+   * Returns the authentication scheme these credentials are for.
+   *
+   * @return the scheme these credentials are for.
+   */
+  public String scheme() {
+    return scheme;
+  }
+
+  /**
+   * Returns the authentication token.
+   *
+   * @return the authentication token.
+   */
+  public byte[] authToken() {
+    return authToken;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof Credentials)) {
+      return false;
+    }
+
+    Credentials other = (Credentials) o;
+    return new EqualsBuilder()
+        .append(scheme, other.scheme())
+        .append(authToken, other.authToken())
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(scheme, authToken);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
new file mode 100644
index 0000000..2720dd1
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Commands;
+import org.apache.aurora.common.base.ExceptionalSupplier;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.BackoffHelper;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class exposes methods for joining and monitoring distributed groups.  The groups this class
+ * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
+ * each member of a group.
+ */
+public class Group {
+  private static final Logger LOG = LoggerFactory.getLogger(Group.class);
+
+  private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
+  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
+
+  private final ZooKeeperClient zkClient;
+  private final ImmutableList<ACL> acl;
+  private final String path;
+
+  private final NodeScheme nodeScheme;
+  private final Predicate<String> nodeNameFilter;
+
+  private final BackoffHelper backoffHelper;
+
+  /**
+   * Creates a group rooted at the given {@code path}.  Paths must be absolute and trailing or
+   * duplicate slashes will be normalized.  For example, all the following paths would create a
+   * group at the normalized path /my/distributed/group:
+   * <ul>
+   *   <li>/my/distributed/group
+   *   <li>/my/distributed/group/
+   *   <li>/my/distributed//group
+   * </ul>
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param acl the ACL to use for creating the persistent group path if it does not already exist
+   * @param path the absolute persistent path that represents this group
+   * @param nodeScheme the scheme that defines how nodes are created
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
+    this.zkClient = Preconditions.checkNotNull(zkClient);
+    this.acl = ImmutableList.copyOf(acl);
+    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
+
+    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
+    nodeNameFilter = Group.this.nodeScheme::isMember;
+
+    backoffHelper = new BackoffHelper();
+  }
+
+  /**
+   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
+   * {@code namePrefix} of 'member_'.
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
+  }
+
+  /**
+   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
+   * {@link DefaultScheme} using {@code namePrefix}.
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
+    this(zkClient, acl, path, new DefaultScheme(namePrefix));
+  }
+
+  public String getMemberPath(String memberId) {
+    return path + "/" + MorePreconditions.checkNotBlank(memberId);
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public String getMemberId(String nodePath) {
+    MorePreconditions.checkNotBlank(nodePath);
+    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
+        "Not a member of this group[%s]: %s", path, nodePath);
+
+    String memberId = StringUtils.substringAfterLast(nodePath, "/");
+    Preconditions.checkArgument(nodeScheme.isMember(memberId),
+        "Not a group member: %s", memberId);
+    return memberId;
+  }
+
+  /**
+   * Returns the current list of group member ids by querying ZooKeeper synchronously.
+   *
+   * @return the ids of all the present members of this group
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading this group's member ids
+   * @throws InterruptedException if this thread is interrupted listing the group members
+   */
+  public Iterable<String> getMemberIds()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+    return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
+  }
+
+  /**
+   * Gets the data for one of this groups members by querying ZooKeeper synchronously.
+   *
+   * @param memberId the id of the member whose data to retrieve
+   * @return the data associated with the {@code memberId}
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading this member's data
+   * @throws InterruptedException if this thread is interrupted retrieving the member data
+   */
+  public byte[] getMemberData(String memberId)
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+    return zkClient.get().getData(getMemberPath(memberId), false, null);
+  }
+
+  /**
+   * Represents membership in a distributed group.
+   */
+  public interface Membership {
+
+    /**
+     * Returns the persistent ZooKeeper path that represents this group.
+     */
+    String getGroupPath();
+
+    /**
+     * Returns the id (ZooKeeper node name) of this group member.  May change over time if the
+     * ZooKeeper session expires.
+     */
+    String getMemberId();
+
+    /**
+     * Returns the full ZooKeeper path to this group member.  May change over time if the
+     * ZooKeeper session expires.
+     */
+    String getMemberPath();
+
+    /**
+     * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
+     * {@link Group#join()}.
+     *
+     * @return the new membership data
+     * @throws UpdateException if there was a problem updating the membership data
+     */
+    byte[] updateMemberData() throws UpdateException;
+
+    /**
+     * Cancels group membership by deleting the associated ZooKeeper member node.
+     *
+     * @throws JoinException if there is a problem deleting the node
+     */
+    void cancel() throws JoinException;
+  }
+
+  /**
+   * Indicates an error joining a group.
+   */
+  public static class JoinException extends Exception {
+    public JoinException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Indicates an error updating a group member's data.
+   */
+  public static class UpdateException extends Exception {
+    public UpdateException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Equivalent to calling {@code join(null, null)}.
+   */
+  public final Membership join() throws JoinException, InterruptedException {
+    return join(NO_MEMBER_DATA, null);
+  }
+
+  /**
+   * Equivalent to calling {@code join(memberData, null)}.
+   */
+  public final Membership join(Supplier<byte[]> memberData)
+      throws JoinException, InterruptedException {
+
+    return join(memberData, null);
+  }
+
+  /**
+   * Equivalent to calling {@code join(null, onLoseMembership)}.
+   */
+  public final Membership join(@Nullable final Command onLoseMembership)
+      throws JoinException, InterruptedException {
+
+    return join(NO_MEMBER_DATA, onLoseMembership);
+  }
+
+  /**
+   * Joins this group and returns the resulting Membership when successful.  Membership will be
+   * automatically cancelled when the current jvm process dies; however the returned Membership
+   * object can be used to cancel membership earlier.  Unless
+   * {@link Group.Membership#cancel()} is called the membership will
+   * be maintained by re-establishing it silently in the background.
+   *
+   * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper.  If an
+   * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
+   * membership in the group.
+   *
+   * @param memberData a supplier of the data to store in the member node
+   * @param onLoseMembership a callback to notify when membership is lost
+   * @return a Membership object with the member details
+   * @throws JoinException if there was a problem joining the group
+   * @throws InterruptedException if this thread is interrupted awaiting completion of the join
+   */
+  public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
+      throws JoinException, InterruptedException {
+
+    Preconditions.checkNotNull(memberData);
+    ensurePersistentGroupPath();
+
+    final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
+    return backoffHelper.doUntilResult(() -> {
+      try {
+        return groupJoiner.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new JoinException("Interrupted trying to join group at path: " + path, e);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.warn("Temporary error trying to join group at path: " + path, e);
+        return null;
+      } catch (KeeperException e) {
+        if (zkClient.shouldRetry(e)) {
+          LOG.warn("Temporary error trying to join group at path: " + path, e);
+          return null;
+        } else {
+          throw new JoinException("Problem joining partition group at path: " + path, e);
+        }
+      }
+    });
+  }
+
+  private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
+    backoffHelper.doUntilSuccess(() -> {
+      try {
+        ZooKeeperUtils.ensurePath(zkClient, acl, path);
+        return true;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+        return false;
+      } catch (KeeperException e) {
+        if (zkClient.shouldRetry(e)) {
+          LOG.warn("Temporary error ensuring path: " + path, e);
+          return false;
+        } else {
+          throw new JoinException("Problem ensuring group at path: " + path, e);
+        }
+      }
+    });
+  }
+
+  private class ActiveMembership implements Membership {
+    private final Supplier<byte[]> memberData;
+    private final Command onLoseMembership;
+    private String nodePath;
+    private String memberId;
+    private volatile boolean cancelled;
+    private byte[] membershipData;
+
+    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
+      this.memberData = memberData;
+      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
+    }
+
+    @Override
+    public String getGroupPath() {
+      return path;
+    }
+
+    @Override
+    public synchronized String getMemberId() {
+      return memberId;
+    }
+
+    @Override
+    public synchronized String getMemberPath() {
+      return nodePath;
+    }
+
+    @Override
+    public synchronized byte[] updateMemberData() throws UpdateException {
+      byte[] membershipData = memberData.get();
+      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
+        try {
+          zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
+          this.membershipData = membershipData;
+        } catch (KeeperException e) {
+          throw new UpdateException("Problem updating membership data.", e);
+        } catch (InterruptedException e) {
+          throw new UpdateException("Interrupted attempting to update membership data.", e);
+        } catch (ZooKeeperConnectionException e) {
+          throw new UpdateException(
+              "Could not connect to the ZooKeeper cluster to update membership data.", e);
+        }
+      }
+      return membershipData;
+    }
+
+    @Override
+    public synchronized void cancel() throws JoinException {
+      if (!cancelled) {
+        try {
+          backoffHelper.doUntilSuccess(() -> {
+            try {
+              zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
+              return true;
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
+            } catch (ZooKeeperConnectionException e) {
+              LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+              return false;
+            } catch (NoNodeException e) {
+              LOG.info("Membership already cancelled, node at path: " + nodePath +
+                       " has been deleted");
+              return true;
+            } catch (KeeperException e) {
+              if (zkClient.shouldRetry(e)) {
+                LOG.warn("Temporary error cancelling membership: " + nodePath, e);
+                return false;
+              } else {
+                throw new JoinException("Problem cancelling membership: " + nodePath, e);
+              }
+            }
+          });
+          cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new JoinException("Problem cancelling membership: " + nodePath, e);
+        }
+      }
+    }
+
+    private class CancelledException extends IllegalStateException { /* marker */ }
+
+    synchronized Membership join()
+        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+      if (cancelled) {
+        throw new CancelledException();
+      }
+
+      if (nodePath == null) {
+        // Re-join if our ephemeral node goes away due to session expiry - only needs to be
+        // registered once.
+        zkClient.registerExpirationHandler(this::tryJoin);
+      }
+
+      byte[] membershipData = memberData.get();
+      String nodeName = nodeScheme.createName(membershipData);
+      CreateMode createMode = nodeScheme.isSequential()
+          ? CreateMode.EPHEMERAL_SEQUENTIAL
+          : CreateMode.EPHEMERAL;
+      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
+      memberId = Group.this.getMemberId(nodePath);
+      LOG.info("Set group member ID to " + memberId);
+      this.membershipData = membershipData;
+
+      // Re-join if our ephemeral node goes away due to maliciousness.
+      zkClient.get().exists(nodePath, event -> {
+        if (event.getType() == EventType.NodeDeleted) {
+          tryJoin();
+        }
+      });
+
+      return this;
+    }
+
+    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
+        () -> {
+          try {
+            join();
+            return true;
+          } catch (CancelledException e) {
+            // Lost a cancel race - that's ok.
+            return true;
+          } catch (ZooKeeperConnectionException e) {
+            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+            return false;
+          } catch (KeeperException e) {
+            if (zkClient.shouldRetry(e)) {
+              LOG.warn("Temporary error re-joining group: " + path, e);
+              return false;
+            } else {
+              throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
+            }
+          }
+        };
+
+    private synchronized void tryJoin() {
+      onLoseMembership.execute();
+      try {
+        backoffHelper.doUntilSuccess(tryJoin);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
+      }
+    }
+  }
+
+  /**
+   * An interface to an object that listens for changes to a group's membership.
+   */
+  public interface GroupChangeListener {
+
+    /**
+     * Called whenever group membership changes with the new list of member ids.
+     *
+     * @param memberIds the current member ids
+     */
+    void onGroupChange(Iterable<String> memberIds);
+  }
+
+  /**
+   * An interface that dictates the scheme to use for storing and filtering nodes that represent
+   * members of a distributed group.
+   */
+  public interface NodeScheme {
+    /**
+     * Determines if a child node is a member of a group by examining the node's name.
+     *
+     * @param nodeName the name of a child node found in a group
+     * @return {@code true} if {@code nodeName} identifies a group member in this scheme
+     */
+    boolean isMember(String nodeName);
+
+    /**
+     * Generates a node name for the node representing this process in the distributed group.
+     *
+     * @param membershipData the data that will be stored in this node
+     * @return the name for the node that will represent this process in the group
+     */
+    String createName(byte[] membershipData);
+
+    /**
+     * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
+     *
+     * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
+     */
+    boolean isSequential();
+  }
+
+  /**
+   * Indicates an error watching a group.
+   */
+  public static class WatchException extends Exception {
+    public WatchException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Watches this group for the lifetime of this jvm process.  This method will block until the
+   * current group members are available, notify the {@code groupChangeListener} and then return.
+   * All further changes to the group membership will cause notifications on a background thread.
+   *
+   * @param groupChangeListener the listener to notify of group membership change events
+   * @return A command which, when executed, will stop watching the group.
+   * @throws WatchException if there is a problem generating the 1st group membership list
+   * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
+   */
+  public final Command watch(final GroupChangeListener groupChangeListener)
+      throws WatchException, InterruptedException {
+    Preconditions.checkNotNull(groupChangeListener);
+
+    try {
+      ensurePersistentGroupPath();
+    } catch (JoinException e) {
+      throw new WatchException("Failed to create group path: " + path, e);
+    }
+
+    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
+    backoffHelper.doUntilSuccess(() -> {
+      try {
+        groupMonitor.watchGroup();
+        return true;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new WatchException("Interrupted trying to watch group at path: " + path, e);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.warn("Temporary error trying to watch group at path: " + path, e);
+        return null;
+      } catch (KeeperException e) {
+        if (zkClient.shouldRetry(e)) {
+          LOG.warn("Temporary error trying to watch group at path: " + path, e);
+          return null;
+        } else {
+          throw new WatchException("Problem trying to watch group at path: " + path, e);
+        }
+      }
+    });
+    return groupMonitor::stopWatching;
+  }
+
+  /**
+   * Helps continuously monitor a group for membership changes.
+   */
+  private class GroupMonitor {
+    private final GroupChangeListener groupChangeListener;
+    private volatile boolean stopped = false;
+    private Set<String> members;
+
+    GroupMonitor(GroupChangeListener groupChangeListener) {
+      this.groupChangeListener = groupChangeListener;
+    }
+
+    private final Watcher groupWatcher = event -> {
+      if (event.getType() == EventType.NodeChildrenChanged) {
+        tryWatchGroup();
+      }
+    };
+
+    private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
+        () -> {
+          try {
+            watchGroup();
+            return true;
+          } catch (ZooKeeperConnectionException e) {
+            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+            return false;
+          } catch (KeeperException e) {
+            if (zkClient.shouldRetry(e)) {
+              LOG.warn("Temporary error re-watching group: " + path, e);
+              return false;
+            } else {
+              throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
+            }
+          }
+        };
+
+    private void tryWatchGroup() {
+      if (stopped) {
+        return;
+      }
+
+      try {
+        backoffHelper.doUntilSuccess(tryWatchGroup);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
+      }
+    }
+
+    private void watchGroup()
+        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+      if (stopped) {
+        return;
+      }
+
+      List<String> children = zkClient.get().getChildren(path, groupWatcher);
+      setMembers(Iterables.filter(children, nodeNameFilter));
+    }
+
+    private void stopWatching() {
+      // TODO(William Farner): Cancel the watch when
+      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
+      LOG.info("Stopping watch on " + this);
+      stopped = true;
+    }
+
+    synchronized void setMembers(Iterable<String> members) {
+      if (stopped) {
+        LOG.info("Suppressing membership update, no longer watching " + this);
+        return;
+      }
+
+      if (this.members == null) {
+        // Reset our watch on the group if session expires - only needs to be registered once.
+        zkClient.registerExpirationHandler(this::tryWatchGroup);
+      }
+
+      Set<String> membership = ImmutableSet.copyOf(members);
+      if (!membership.equals(this.members)) {
+        groupChangeListener.onGroupChange(members);
+        this.members = membership;
+      }
+    }
+  }
+
+  /**
+   * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
+   * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
+   * prefix is "member_", the node's full path will look something like
+   * {@code /discovery/servicename/member_0000000007}.
+   */
+  public static class DefaultScheme implements NodeScheme {
+    private final String namePrefix;
+    private final Pattern namePattern;
+
+    /**
+     * Creates a sequential node scheme based on the given node name prefix.
+     *
+     * @param namePrefix the prefix for the names of the member nodes
+     */
+    public DefaultScheme(String namePrefix) {
+      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
+      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
+    }
+
+    @Override
+    public boolean isMember(String nodeName) {
+      return namePattern.matcher(nodeName).matches();
+    }
+
+    @Override
+    public String createName(byte[] membershipData) {
+      return namePrefix;
+    }
+
+    @Override
+    public boolean isSequential() {
+      return true;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "Group " + path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
new file mode 100644
index 0000000..9d31608
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonParseException;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+import static java.util.Objects.requireNonNull;
+
+class JsonCodec implements Codec<ServiceInstance> {
+
+  private static void assertRequiredField(String fieldName, Object fieldValue) {
+    if (fieldValue == null) {
+      throw new JsonParseException(String.format("Field %s is required", fieldName));
+    }
+  }
+
+  private static class EndpointSchema {
+    private final String host;
+    private final Integer port;
+
+    EndpointSchema(Endpoint endpoint) {
+      host = endpoint.getHost();
+      port = endpoint.getPort();
+    }
+
+    Endpoint asEndpoint() {
+      assertRequiredField("host", host);
+      assertRequiredField("port", port);
+
+      return new Endpoint(host, port);
+    }
+  }
+
+  private static class ServiceInstanceSchema {
+    private final EndpointSchema serviceEndpoint;
+    private final Map<String, EndpointSchema> additionalEndpoints;
+    private final Status status;
+    private final @Nullable Integer shard;
+
+    ServiceInstanceSchema(ServiceInstance instance) {
+      serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+      if (instance.isSetAdditionalEndpoints()) {
+        additionalEndpoints =
+            Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
+      } else {
+        additionalEndpoints = ImmutableMap.of();
+      }
+      status  = instance.getStatus();
+      shard = instance.isSetShard() ? instance.getShard() : null;
+    }
+
+    ServiceInstance asServiceInstance() {
+      assertRequiredField("serviceEndpoint", serviceEndpoint);
+      assertRequiredField("status", status);
+
+      Map<String, EndpointSchema> extraEndpoints =
+          additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints;
+
+      ServiceInstance instance =
+          new ServiceInstance(
+              serviceEndpoint.asEndpoint(),
+              Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint),
+              status);
+      if (shard != null) {
+        instance.setShard(shard);
+      }
+      return instance;
+    }
+  }
+
+  private static final Charset ENCODING = Charsets.UTF_8;
+
+  private final Gson gson;
+
+  JsonCodec() {
+    this(new Gson());
+  }
+
+  JsonCodec(Gson gson) {
+    this.gson = requireNonNull(gson);
+  }
+
+  @Override
+  public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
+    Writer writer = new OutputStreamWriter(sink, ENCODING);
+    try {
+      gson.toJson(new ServiceInstanceSchema(instance), writer);
+    } catch (JsonIOException e) {
+      throw new IOException(String.format("Problem serializing %s to JSON", instance), e);
+    }
+    writer.flush();
+  }
+
+  @Override
+  public ServiceInstance deserialize(InputStream source) throws IOException {
+    InputStreamReader reader = new InputStreamReader(source, ENCODING);
+    try {
+      @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class);
+      if (schema == null) {
+        throw new IOException("JSON did not include a ServiceInstance object");
+      }
+      return schema.asServiceInstance();
+    } catch (JsonParseException e) {
+      throw new IOException("Problem parsing JSON ServiceInstance.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
new file mode 100644
index 0000000..aeea02d
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+
+/**
+ * A logical set of servers registered in ZooKeeper.  Intended to be used by servers in a
+ * common service to advertise their presence to server-set protocol-aware clients.
+ *
+ * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
+ * rendezvous data to zookeeper so that standard clients can interoperate.
+ */
+public interface ServerSet {
+
+  /**
+   * Encodes a {@link ServiceInstance} as a JSON object.
+   *
+   * This is the default encoding for service instance data in ZooKeeper.
+   */
+  Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
+
+  /**
+   * Attempts to join a server set for this logical service group.
+   *
+   * @param endpoint the primary service endpoint
+   * @param additionalEndpoints and additional endpoints keyed by their logical name
+   * @return an EndpointStatus object that allows the endpoint to adjust its status
+   * @throws JoinException if there was a problem joining the server set
+   * @throws InterruptedException if interrupted while waiting to join the server set
+   */
+  EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints)
+      throws JoinException, InterruptedException;
+
+  /**
+   * A handle to a service endpoint's status data that allows updating it to track current events.
+   */
+  interface EndpointStatus {
+
+    /**
+     * Removes the endpoint from the server set.
+     *
+     * @throws UpdateException if there was a problem leaving the ServerSet.
+     */
+    void leave() throws UpdateException;
+  }
+
+  /**
+   * Indicates an error updating a service's status information.
+   */
+  class UpdateException extends Exception {
+    public UpdateException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
new file mode 100644
index 0000000..ace4980
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.util.BackoffHelper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}.
+ */
+public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> {
+  private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class);
+
+  private final ZooKeeperClient zkClient;
+  private final Group group;
+  private final Codec<ServiceInstance> codec;
+  private final BackoffHelper backoffHelper;
+
+  /**
+   * Creates a new ServerSet using open ZooKeeper node ACLs.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param path the name-service path of the service to connect to
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
+    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
+  }
+
+  /**
+   * Creates a new ServerSet for the given service {@code path}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param acl the ACL to use for creating the persistent group path if it does not already exist
+   * @param path the name-service path of the service to connect to
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+    this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
+  }
+
+  /**
+   * Creates a new ServerSet using the given service {@code group}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param group the server group
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
+    this(zkClient, group, JSON_CODEC);
+  }
+
+  /**
+   * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param group the server group
+   * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
+   *     from a byte array
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
+    this.zkClient = checkNotNull(zkClient);
+    this.group = checkNotNull(group);
+    this.codec = checkNotNull(codec);
+
+    // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
+    backoffHelper = new BackoffHelper();
+  }
+
+  @VisibleForTesting
+  ZooKeeperClient getZkClient() {
+    return zkClient;
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints)
+      throws Group.JoinException, InterruptedException {
+
+    checkNotNull(endpoint);
+    checkNotNull(additionalEndpoints);
+
+    MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints);
+    Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance;
+    Group.Membership membership = group.join(serviceInstanceSupplier);
+
+    return () -> memberStatus.leave(membership);
+  }
+
+  @Override
+  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
+    try {
+      return serverSetWatcher.watch();
+    } catch (Group.WatchException e) {
+      throw new MonitorException("ZooKeeper watch failed.", e);
+    } catch (InterruptedException e) {
+      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
+    }
+  }
+
+  private class MemberStatus {
+    private final InetSocketAddress endpoint;
+    private final Map<String, InetSocketAddress> additionalEndpoints;
+
+    private MemberStatus(
+        InetSocketAddress endpoint,
+        Map<String, InetSocketAddress> additionalEndpoints) {
+
+      this.endpoint = endpoint;
+      this.additionalEndpoints = additionalEndpoints;
+    }
+
+    synchronized void leave(Group.Membership membership) throws UpdateException {
+      try {
+        membership.cancel();
+      } catch (Group.JoinException e) {
+        throw new UpdateException(
+            "Failed to auto-cancel group membership on transition to DEAD status", e);
+      }
+    }
+
+    byte[] serializeServiceInstance() {
+      ServiceInstance serviceInstance = new ServiceInstance(
+          ServerSets.toEndpoint(endpoint),
+          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
+          Status.ALIVE);
+
+      LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
+      try {
+        return ServerSets.serializeServiceInstance(serviceInstance, codec);
+      } catch (IOException e) {
+        throw new IllegalStateException("Unexpected problem serializing thrift struct " +
+            serviceInstance + "to a byte[]", e);
+      }
+    }
+  }
+
+  private static class ServiceInstanceFetchException extends RuntimeException {
+    ServiceInstanceFetchException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  private static class ServiceInstanceDeletedException extends RuntimeException {
+    ServiceInstanceDeletedException(String path) {
+      super(path);
+    }
+  }
+
+  private class ServerSetWatcher {
+    private final ZooKeeperClient zkClient;
+    private final HostChangeMonitor<ServiceInstance> monitor;
+    @Nullable private ImmutableSet<ServiceInstance> serverSet;
+
+    ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
+      this.zkClient = zkClient;
+      this.monitor = monitor;
+    }
+
+    public Command watch() throws Group.WatchException, InterruptedException {
+      Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet);
+
+      try {
+        return group.watch(this::notifyGroupChange);
+      } catch (Group.WatchException e) {
+        zkClient.unregister(onExpirationWatcher);
+        throw e;
+      } catch (InterruptedException e) {
+        zkClient.unregister(onExpirationWatcher);
+        throw e;
+      }
+    }
+
+    private ServiceInstance getServiceInstance(final String nodePath) {
+      try {
+        return backoffHelper.doUntilResult(() -> {
+          try {
+            byte[] data = zkClient.get().getData(nodePath, false, null);
+            return ServerSets.deserializeServiceInstance(data, codec);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ServiceInstanceFetchException(
+                "Interrupted updating service data for: " + nodePath, e);
+          } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            LOG.warn("Temporary error trying to updating service data for: " + nodePath, e);
+            return null;
+          } catch (NoNodeException e) {
+            invalidateNodePath(nodePath);
+            throw new ServiceInstanceDeletedException(nodePath);
+          } catch (KeeperException e) {
+            if (zkClient.shouldRetry(e)) {
+              LOG.warn("Temporary error trying to update service data for: " + nodePath, e);
+              return null;
+            } else {
+              throw new ServiceInstanceFetchException(
+                  "Failed to update service data for: " + nodePath, e);
+            }
+          } catch (IOException e) {
+            throw new ServiceInstanceFetchException(
+                "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
+          }
+        });
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ServiceInstanceFetchException(
+            "Interrupted trying to update service data for: " + nodePath, e);
+      }
+    }
+
+    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
+        CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
+          @Override public ServiceInstance load(String memberId) {
+            return getServiceInstance(group.getMemberPath(memberId));
+          }
+        });
+
+    private void rebuildServerSet() {
+      Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
+      servicesByMemberId.invalidateAll();
+      notifyGroupChange(memberIds);
+    }
+
+    private String invalidateNodePath(String deletedPath) {
+      String memberId = group.getMemberId(deletedPath);
+      servicesByMemberId.invalidate(memberId);
+      return memberId;
+    }
+
+    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
+        memberId -> {
+          // This get will trigger a fetch
+          try {
+            return servicesByMemberId.getUnchecked(memberId);
+          } catch (UncheckedExecutionException e) {
+            Throwable cause = e.getCause();
+            if (!(cause instanceof ServiceInstanceDeletedException)) {
+              Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
+              throw new IllegalStateException(
+                  "Unexpected error fetching member data for: " + memberId, e);
+            }
+            return null;
+          }
+        };
+
+    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
+      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
+      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
+
+      // Ignore no-op state changes except for the 1st when we've seen no group yet.
+      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
+        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
+        // Implicit removal from servicesByMemberId.
+        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
+
+        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
+            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
+
+        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
+      }
+    }
+
+    private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
+      // ZK nodes may have changed if there was a session expiry for a server in the server set, but
+      // if the server's status has not changed, we can skip any onChange updates.
+      if (!currentServerSet.equals(serverSet)) {
+        if (currentServerSet.isEmpty()) {
+          LOG.warn("server set empty for path " + group.getPath());
+        } else {
+          if (serverSet == null) {
+            LOG.info("received initial membership {}", currentServerSet);
+          } else {
+            logChange(currentServerSet);
+          }
+        }
+        serverSet = currentServerSet;
+        monitor.onChange(serverSet);
+      }
+    }
+
+    private void logChange(ImmutableSet<ServiceInstance> newServerSet) {
+      StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
+      if (serverSet.size() != newServerSet.size()) {
+        message.append("from ").append(serverSet.size())
+            .append(" members to ").append(newServerSet.size());
+      }
+
+      Joiner joiner = Joiner.on("\n\t\t");
+
+      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
+      if (!left.isEmpty()) {
+        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
+      }
+
+      SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
+      if (!joined.isEmpty()) {
+        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
+      }
+
+      LOG.info(message.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
new file mode 100644
index 0000000..01a54a5
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Common ServerSet related functions
+ */
+public class ServerSets {
+
+  private ServerSets() {
+    // Utility class.
+  }
+
+  /**
+   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
+   */
+  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
+      ServerSets::toEndpoint;
+
+  /**
+   * Creates a server set that registers at a single path applying the given ACL to all nodes
+   * created in the path.
+   *
+   * @param zkClient ZooKeeper client to register with.
+   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
+   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, java.util.Set)
+   * @return A server set that registers at {@code zkPath}.
+   */
+  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
+    Preconditions.checkNotNull(zkClient);
+    MorePreconditions.checkNotBlank(acl);
+    MorePreconditions.checkNotBlank(zkPath);
+
+    return new ServerSetImpl(zkClient, acl, zkPath);
+  }
+
+  /**
+   * Returns a serialized Thrift service instance object, with given endpoints and codec.
+   *
+   * @param serviceInstance the Thrift service instance object to be serialized
+   * @param codec the codec to use to serialize a Thrift service instance object
+   * @return byte array that contains a serialized Thrift service instance
+   */
+  public static byte[] serializeServiceInstance(
+      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    codec.serialize(serviceInstance, output);
+    return output.toByteArray();
+  }
+
+  /**
+   * Serializes a service instance based on endpoints.
+   * @see #serializeServiceInstance(ServiceInstance, Codec)
+   *
+   * @param address the target address of the service instance
+   * @param additionalEndpoints additional endpoints of the service instance
+   * @param status service status
+   */
+  public static byte[] serializeServiceInstance(
+      InetSocketAddress address,
+      Map<String, Endpoint> additionalEndpoints,
+      Status status,
+      Codec<ServiceInstance> codec) throws IOException {
+
+    ServiceInstance serviceInstance =
+        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
+    return serializeServiceInstance(serviceInstance, codec);
+  }
+
+  /**
+   * Creates a service instance object deserialized from byte array.
+   *
+   * @param data the byte array contains a serialized Thrift service instance
+   * @param codec the codec to use to deserialize the byte array
+   */
+  public static ServiceInstance deserializeServiceInstance(
+      byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+    return codec.deserialize(new ByteArrayInputStream(data));
+  }
+
+  /**
+   * Creates an endpoint for the given InetSocketAddress.
+   *
+   * @param address the target address to create the endpoint for
+   */
+  public static Endpoint toEndpoint(InetSocketAddress address) {
+    return new Endpoint(address.getHostName(), address.getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
new file mode 100644
index 0000000..7f962eb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * A service that uses master election to only allow a single service instance to be active amongst
+ * a set of potential servers at a time.
+ */
+public interface SingletonService {
+
+  /**
+   * Indicates an error attempting to lead a group of servers.
+   */
+  class LeadException extends Exception {
+    public LeadException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Indicates an error attempting to advertise leadership of a group of servers.
+   */
+  class AdvertiseException extends Exception {
+    public AdvertiseException(String message) {
+      super(message);
+    }
+
+    public AdvertiseException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
+   */
+  class LeaveException extends Exception {
+    public LeaveException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Attempts to lead the singleton service.
+   *
+   * @param endpoint The primary endpoint to register as a leader candidate in the service.
+   * @param additionalEndpoints Additional endpoints that are available on the host.
+   * @param listener Handler to call when the candidate is elected or defeated.
+   * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
+   * @throws InterruptedException If the thread watching/joining the group was interrupted.
+   */
+  void lead(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      LeadershipListener listener)
+      throws LeadException, InterruptedException;
+
+  /**
+   * A listener to be notified of changes in the leadership status.
+   * Implementers should be careful to avoid blocking operations in these callbacks.
+   */
+  interface LeadershipListener {
+
+    /**
+     * Notifies the listener that is is current leader.
+     *
+     * @param control A controller handle to advertise and/or leave advertised presence.
+     */
+    void onLeading(LeaderControl control);
+
+    /**
+     * Notifies the listener that it is no longer leader.
+     */
+    void onDefeated();
+  }
+
+  /**
+   * A controller for the state of the leader.  This will be provided to the leader upon election,
+   * which allows the leader to decide when to advertise as leader of the server set and terminate
+   * leadership at will.
+   */
+  interface LeaderControl {
+
+    /**
+     * Advertises the leader's server presence to clients.
+     *
+     * @throws AdvertiseException If there was an error advertising the singleton leader to clients
+     *     of the server set.
+     * @throws InterruptedException If interrupted while advertising.
+     */
+    void advertise() throws AdvertiseException, InterruptedException;
+
+    /**
+     * Leaves candidacy for leadership, removing advertised server presence if applicable.
+     *
+     * @throws LeaveException If the leader's status could not be updated or there was an error
+     *     abdicating server set leadership.
+     */
+    void leave() throws LeaveException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
new file mode 100644
index 0000000..d9978a9
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.zookeeper.data.ACL;
+
+public class SingletonServiceImpl implements SingletonService {
+  @VisibleForTesting
+  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+  /**
+   * Creates a candidate that can be combined with an existing server set to form a singleton
+   * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param servicePath The path where service nodes live.
+   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+   * @return A candidate that can be housed with a standard server set under a single zk path.
+   */
+  public static Candidate createSingletonCandidate(
+      ZooKeeperClient zkClient,
+      String servicePath,
+      Iterable<ACL> acl) {
+
+    return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+  }
+
+  private final ServerSet serverSet;
+  private final Candidate candidate;
+
+  /**
+   * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
+   * advertises itself in the given server set once elected.
+   *
+   * @param serverSet The server set to advertise in on election.
+   * @param candidate The candidacy to use to vie for election.
+   */
+  public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
+    this.serverSet = Preconditions.checkNotNull(serverSet);
+    this.candidate = Preconditions.checkNotNull(candidate);
+  }
+
+  @Override
+  public void lead(final InetSocketAddress endpoint,
+                   final Map<String, InetSocketAddress> additionalEndpoints,
+                   final LeadershipListener listener)
+                   throws LeadException, InterruptedException {
+
+    Preconditions.checkNotNull(listener);
+
+    try {
+      candidate.offerLeadership(new Leader() {
+        @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
+          listener.onLeading(new LeaderControl() {
+            ServerSet.EndpointStatus endpointStatus = null;
+            final AtomicBoolean left = new AtomicBoolean(false);
+
+            // Methods are synchronized to prevent simultaneous invocations.
+            @Override public synchronized void advertise()
+                throws AdvertiseException, InterruptedException {
+
+              Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
+              Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
+              try {
+                endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+              } catch (JoinException e) {
+                throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
+              }
+            }
+
+            @Override public synchronized void leave() throws LeaveException {
+              Preconditions.checkState(left.compareAndSet(false, true),
+                  "Cannot leave more than once.");
+              if (endpointStatus != null) {
+                try {
+                  endpointStatus.leave();
+                } catch (ServerSet.UpdateException e) {
+                  throw new LeaveException("Problem updating endpoint status for abdicating leader " +
+                      "at endpoint " + endpoint, e);
+                }
+              }
+              try {
+                abdicate.execute();
+              } catch (JoinException e) {
+                throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
+              }
+            }
+          });
+        }
+
+        @Override public void onDefeated() {
+          listener.onDefeated();
+        }
+      });
+    } catch (JoinException e) {
+      throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
+    } catch (Group.WatchException e) {
+      throw new LeadException("Problem getting initial membership list for leadership group.", e);
+    }
+  }
+}