You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/25 20:19:32 UTC

[18/37] aurora git commit: Import of Twitter Commons.

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
new file mode 100644
index 0000000..646fab9
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
@@ -0,0 +1,175 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Ordering;
+import com.twitter.common.zookeeper.Group.GroupChangeListener;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.Membership;
+import com.twitter.common.zookeeper.Group.UpdateException;
+import com.twitter.common.zookeeper.Group.WatchException;
+import org.apache.zookeeper.data.ACL;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * A distributed mechanism for eventually arriving at an evenly partitioned space of long values.
+ * A typical usage would have a client on each of several hosts joining a logical partition (a
+ * "partition group") that represents some shared work.  Clients could then process a subset of a
+ * full body of work by testing any given item of work with their partition filter.
+ *
+ * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1
+ * partition as explained in {@link #join()}.
+ *
+ * @author John Sirois
+ */
+public class Partitioner {
+
+  private static final Logger LOG = Logger.getLogger(Partitioner.class.getName());
+
+  private volatile int groupSize;
+  private volatile int groupIndex;
+  private final Group group;
+
+  /**
+   * Constructs a representation of a partition group but does not join it.  Note that the partition
+   * group path will be created as a persistent zookeeper path if it does not already exist.
+   *
+   * @param zkClient a client to use for joining the partition group and watching its membership
+   * @param acl the acl for this partition group
+   * @param path a zookeeper path that represents the partition group
+   */
+  public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) {
+    group = new Group(zkClient, acl, path);
+  }
+
+  @VisibleForTesting
+  int getGroupSize() {
+    return groupSize;
+  }
+
+  /**
+   * Represents a slice of a partition group.  The partition is dynamic and will adjust its size as
+   * members join and leave its partition group.
+   */
+  public abstract static class Partition implements Predicate<Long>, Membership {
+
+    /**
+     * Returns {@code true} if the given {@code value} is a member of this partition at this time.
+     */
+    public abstract boolean isMember(long value);
+
+    /**
+     * Gets number of members in the group at this time.
+     *
+     * @return number of members in the ZK group at this time.
+     */
+    public abstract int getNumPartitions();
+
+    /**
+     * Evaluates partition membership based on the given {@code value}'s hash code.  If the value
+     * is null it is never a member of a partition.
+     */
+    boolean isMember(Object value) {
+      return (value != null) && isMember(value.hashCode());
+    }
+
+    /**
+     * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing
+     * overhead.
+     */
+    @Override
+    public boolean apply(@Nullable Long input) {
+      return (input != null) && isMember(input);
+    }
+  }
+
+  /**
+   * Attempts to join the partition group and claim a slice.  When successful, a predicate is
+   * returned that can be used to test whether or not an item belongs to this partition.  The
+   * predicate is dynamic such that as the group is further partitioned or partitions merge the
+   * predicate will claim a narrower or wider swath of the partition space respectively.  Partition
+   * creation and merging is not instantaneous and clients should expect independent partitions to
+   * claim ownership of some items when partition membership is in flux.  It is only in the steady
+   * state that a client should expect independent partitions to divide the partition space evenly
+   * and without overlap.
+   *
+   * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation.
+   *
+   * @return the partition representing the slice of the partition group this member can claim
+   * @throws JoinException if there was a problem joining the partition group
+   * @throws InterruptedException if interrupted while waiting to join the partition group
+   */
+  public final Partition join() throws JoinException, InterruptedException {
+    final Membership membership = group.join();
+    try {
+      group.watch(createGroupChangeListener(membership));
+    } catch (WatchException e) {
+      membership.cancel();
+      throw new JoinException("Problem establishing watch on group after joining it", e);
+    }
+    return new Partition() {
+      @Override public boolean isMember(long value) {
+        return (value % groupSize) == groupIndex;
+      }
+
+      @Override public int getNumPartitions() {
+        return groupSize;
+      }
+
+      @Override public String getGroupPath() {
+        return membership.getGroupPath();
+      }
+
+      @Override public String getMemberId() {
+        return membership.getMemberId();
+      }
+
+      @Override public String getMemberPath() {
+        return membership.getMemberPath();
+      }
+
+      @Override public byte[] updateMemberData() throws UpdateException {
+        return membership.updateMemberData();
+      }
+
+      @Override public void cancel() throws JoinException {
+        membership.cancel();
+      }
+    };
+  }
+
+  @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) {
+    return new GroupChangeListener() {
+      @Override public void onGroupChange(Iterable<String> memberIds) {
+        List<String> members = Ordering.natural().sortedCopy(memberIds);
+        int newSize = members.size();
+        int newIndex = members.indexOf(membership.getMemberId());
+
+        LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]",
+            membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex));
+
+        groupSize = newSize;
+        groupIndex = newIndex;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
new file mode 100644
index 0000000..b6a0686
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
@@ -0,0 +1,117 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * A logical set of servers registered in ZooKeeper.  Intended to be used by both servers in a
+ * common service and their clients.
+ *
+ * TODO(William Farner): Explore decoupling this from thrift.
+ */
+public interface ServerSet extends DynamicHostSet<ServiceInstance> {
+
+  /**
+   * Attempts to join a server set for this logical service group.
+   *
+   * @param endpoint the primary service endpoint
+   * @param additionalEndpoints and additional endpoints keyed by their logical name
+   * @param status the current service status
+   * @return an EndpointStatus object that allows the endpoint to adjust its status
+   * @throws JoinException if there was a problem joining the server set
+   * @throws InterruptedException if interrupted while waiting to join the server set
+   * @deprecated The status field is deprecated. Please use {@link #join(InetSocketAddress, Map)}
+   */
+  @Deprecated
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      Status status) throws JoinException, InterruptedException;
+
+  /**
+   * Attempts to join a server set for this logical service group.
+   *
+   * @param endpoint the primary service endpoint
+   * @param additionalEndpoints and additional endpoints keyed by their logical name
+   * @return an EndpointStatus object that allows the endpoint to adjust its status
+   * @throws JoinException if there was a problem joining the server set
+   * @throws InterruptedException if interrupted while waiting to join the server set
+   */
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints)
+      throws JoinException, InterruptedException;
+
+  /**
+   * Attempts to join a server set for this logical service group.
+   *
+   * @param endpoint the primary service endpoint
+   * @param additionalEndpoints and additional endpoints keyed by their logical name
+   * @param shardId Unique shard identifier for this member of the service.
+   * @return an EndpointStatus object that allows the endpoint to adjust its status
+   * @throws JoinException if there was a problem joining the server set
+   * @throws InterruptedException if interrupted while waiting to join the server set
+   */
+  EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      int shardId) throws JoinException, InterruptedException;
+
+  /**
+   * A handle to a service endpoint's status data that allows updating it to track current events.
+   */
+  public interface EndpointStatus {
+
+    /**
+     * Attempts to update the status of the service endpoint associated with this endpoint.  If the
+     * {@code status} is {@link Status#DEAD} then the endpoint will be removed from the server set.
+     *
+     * @param status the current status of the endpoint
+     * @throws UpdateException if there was a problem writing the update
+     * @deprecated Support for mutable status is deprecated. Please use {@link #leave()}
+     */
+    @Deprecated
+    void update(Status status) throws UpdateException;
+
+    /**
+     * Removes the endpoint from the server set.
+     *
+     * @throws UpdateException if there was a problem leaving the ServerSet.
+     */
+    void leave() throws UpdateException;
+  }
+
+  /**
+   * Indicates an error updating a service's status information.
+   */
+  public static class UpdateException extends Exception {
+    public UpdateException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public UpdateException(String message) {
+      super(message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
new file mode 100644
index 0000000..ec6b3f7
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
@@ -0,0 +1,609 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.gson.Gson;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Function;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.io.Codec;
+import com.twitter.common.io.CompatibilityCodec;
+import com.twitter.common.io.ThriftCodec;
+import com.twitter.common.util.BackoffHelper;
+import com.twitter.common.zookeeper.Group.GroupChangeListener;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.Membership;
+import com.twitter.common.zookeeper.Group.WatchException;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper-backed implementation of {@link ServerSet}.
+ */
+public class ServerSetImpl implements ServerSet {
+  private static final Logger LOG = Logger.getLogger(ServerSetImpl.class.getName());
+
+  @CmdLine(name = "serverset_encode_json",
+           help = "If true, use JSON for encoding server set information."
+               + " Defaults to true (use JSON).")
+  private static final Arg<Boolean> ENCODE_JSON = Arg.create(true);
+
+  private final ZooKeeperClient zkClient;
+  private final Group group;
+  private final Codec<ServiceInstance> codec;
+  private final BackoffHelper backoffHelper;
+
+  /**
+   * Creates a new ServerSet using open ZooKeeper node ACLs.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param path the name-service path of the service to connect to
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
+    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
+  }
+
+  /**
+   * Creates a new ServerSet for the given service {@code path}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param acl the ACL to use for creating the persistent group path if it does not already exist
+   * @param path the name-service path of the service to connect to
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+    this(zkClient, new Group(zkClient, acl, path), createDefaultCodec());
+  }
+
+  /**
+   * Creates a new ServerSet using the given service {@code group}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param group the server group
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
+    this(zkClient, group, createDefaultCodec());
+  }
+
+  /**
+   * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param group the server group
+   * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
+   *     from a byte array
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
+    this.zkClient = checkNotNull(zkClient);
+    this.group = checkNotNull(group);
+    this.codec = checkNotNull(codec);
+
+    // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
+    backoffHelper = new BackoffHelper();
+  }
+
+  @VisibleForTesting
+  ZooKeeperClient getZkClient() {
+    return zkClient;
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints)
+      throws JoinException, InterruptedException {
+
+    LOG.log(Level.WARNING,
+        "Joining a ServerSet without a shard ID is deprecated and will soon break.");
+    return join(endpoint, additionalEndpoints, Optional.<Integer>absent());
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      int shardId) throws JoinException, InterruptedException {
+
+    return join(endpoint, additionalEndpoints, Optional.of(shardId));
+  }
+
+  private EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      Optional<Integer> shardId) throws JoinException, InterruptedException {
+
+    checkNotNull(endpoint);
+    checkNotNull(additionalEndpoints);
+
+    final MemberStatus memberStatus =
+        new MemberStatus(endpoint, additionalEndpoints, shardId);
+    Supplier<byte[]> serviceInstanceSupplier = new Supplier<byte[]>() {
+      @Override public byte[] get() {
+        return memberStatus.serializeServiceInstance();
+      }
+    };
+    final Membership membership = group.join(serviceInstanceSupplier);
+
+    return new EndpointStatus() {
+      @Override public void update(Status status) throws UpdateException {
+        checkNotNull(status);
+        LOG.warning("This method is deprecated. Please use leave() instead.");
+        if (status == Status.DEAD) {
+          leave();
+        } else {
+          LOG.warning("Status update has been ignored");
+        }
+      }
+
+      @Override public void leave() throws UpdateException {
+        memberStatus.leave(membership);
+      }
+    };
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      Status status) throws JoinException, InterruptedException {
+
+    LOG.warning("This method is deprecated. Please do not specify a status field.");
+    if (status != Status.ALIVE) {
+      LOG.severe("**************************************************************************\n"
+          + "WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.\n"
+          + "JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status
+          + "\n**************************************************************************");
+    }
+    return join(endpoint, additionalEndpoints);
+  }
+
+  @Override
+  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
+    try {
+      return serverSetWatcher.watch();
+    } catch (WatchException e) {
+      throw new MonitorException("ZooKeeper watch failed.", e);
+    } catch (InterruptedException e) {
+      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
+    }
+  }
+
+  @Override
+  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+    LOG.warning("This method is deprecated. Please use watch instead.");
+    watch(monitor);
+  }
+
+  private class MemberStatus {
+    private final InetSocketAddress endpoint;
+    private final Map<String, InetSocketAddress> additionalEndpoints;
+    private final Optional<Integer> shardId;
+
+    private MemberStatus(
+        InetSocketAddress endpoint,
+        Map<String, InetSocketAddress> additionalEndpoints,
+        Optional<Integer> shardId) {
+
+      this.endpoint = endpoint;
+      this.additionalEndpoints = additionalEndpoints;
+      this.shardId = shardId;
+    }
+
+    synchronized void leave(Membership membership) throws UpdateException {
+      try {
+        membership.cancel();
+      } catch (JoinException e) {
+        throw new UpdateException(
+            "Failed to auto-cancel group membership on transition to DEAD status", e);
+      }
+    }
+
+    byte[] serializeServiceInstance() {
+      ServiceInstance serviceInstance = new ServiceInstance(
+          ServerSets.toEndpoint(endpoint),
+          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
+          Status.ALIVE);
+
+      if (shardId.isPresent()) {
+        serviceInstance.setShard(shardId.get());
+      }
+
+      LOG.fine("updating endpoint data to:\n\t" + serviceInstance);
+      try {
+        return ServerSets.serializeServiceInstance(serviceInstance, codec);
+      } catch (IOException e) {
+        throw new IllegalStateException("Unexpected problem serializing thrift struct " +
+            serviceInstance + "to a byte[]", e);
+      }
+    }
+  }
+
+  private static class ServiceInstanceFetchException extends RuntimeException {
+    ServiceInstanceFetchException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  private static class ServiceInstanceDeletedException extends RuntimeException {
+    ServiceInstanceDeletedException(String path) {
+      super(path);
+    }
+  }
+
+  private class ServerSetWatcher {
+    private final ZooKeeperClient zkClient;
+    private final HostChangeMonitor<ServiceInstance> monitor;
+    @Nullable private ImmutableSet<ServiceInstance> serverSet;
+
+    ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
+      this.zkClient = zkClient;
+      this.monitor = monitor;
+    }
+
+    public Command watch() throws WatchException, InterruptedException {
+      Watcher onExpirationWatcher = zkClient.registerExpirationHandler(new Command() {
+        @Override public void execute() {
+          // Servers may have changed Status while we were disconnected from ZooKeeper, check and
+          // re-register our node watches.
+          rebuildServerSet();
+        }
+      });
+
+      try {
+        return group.watch(new GroupChangeListener() {
+          @Override public void onGroupChange(Iterable<String> memberIds) {
+            notifyGroupChange(memberIds);
+          }
+        });
+      } catch (WatchException e) {
+        zkClient.unregister(onExpirationWatcher);
+        throw e;
+      } catch (InterruptedException e) {
+        zkClient.unregister(onExpirationWatcher);
+        throw e;
+      }
+    }
+
+    private ServiceInstance getServiceInstance(final String nodePath) {
+      try {
+        return backoffHelper.doUntilResult(new Supplier<ServiceInstance>() {
+          @Override public ServiceInstance get() {
+            try {
+              byte[] data = zkClient.get().getData(nodePath, false, null);
+              return ServerSets.deserializeServiceInstance(data, codec);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new ServiceInstanceFetchException(
+                  "Interrupted updating service data for: " + nodePath, e);
+            } catch (ZooKeeperConnectionException e) {
+              LOG.log(Level.WARNING,
+                  "Temporary error trying to updating service data for: " + nodePath, e);
+              return null;
+            } catch (NoNodeException e) {
+              invalidateNodePath(nodePath);
+              throw new ServiceInstanceDeletedException(nodePath);
+            } catch (KeeperException e) {
+              if (zkClient.shouldRetry(e)) {
+                LOG.log(Level.WARNING,
+                    "Temporary error trying to update service data for: " + nodePath, e);
+                return null;
+              } else {
+                throw new ServiceInstanceFetchException(
+                    "Failed to update service data for: " + nodePath, e);
+              }
+            } catch (IOException e) {
+              throw new ServiceInstanceFetchException(
+                  "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
+            }
+          }
+        });
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ServiceInstanceFetchException(
+            "Interrupted trying to update service data for: " + nodePath, e);
+      }
+    }
+
+    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
+        CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
+          @Override public ServiceInstance load(String memberId) {
+            return getServiceInstance(group.getMemberPath(memberId));
+          }
+        });
+
+    private void rebuildServerSet() {
+      Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
+      servicesByMemberId.invalidateAll();
+      notifyGroupChange(memberIds);
+    }
+
+    private String invalidateNodePath(String deletedPath) {
+      String memberId = group.getMemberId(deletedPath);
+      servicesByMemberId.invalidate(memberId);
+      return memberId;
+    }
+
+    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
+        new Function<String, ServiceInstance>() {
+          @Override public ServiceInstance apply(String memberId) {
+            // This get will trigger a fetch
+            try {
+              return servicesByMemberId.getUnchecked(memberId);
+            } catch (UncheckedExecutionException e) {
+              Throwable cause = e.getCause();
+              if (!(cause instanceof ServiceInstanceDeletedException)) {
+                Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
+                throw new IllegalStateException(
+                    "Unexpected error fetching member data for: " + memberId, e);
+              }
+              return null;
+            }
+          }
+        };
+
+    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
+      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
+      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
+
+      // Ignore no-op state changes except for the 1st when we've seen no group yet.
+      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
+        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
+        // Implicit removal from servicesByMemberId.
+        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
+
+        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
+            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
+
+        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
+      }
+    }
+
+    private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
+      // ZK nodes may have changed if there was a session expiry for a server in the server set, but
+      // if the server's status has not changed, we can skip any onChange updates.
+      if (!currentServerSet.equals(serverSet)) {
+        if (currentServerSet.isEmpty()) {
+          LOG.warning("server set empty for path " + group.getPath());
+        } else {
+          if (LOG.isLoggable(Level.INFO)) {
+            if (serverSet == null) {
+              LOG.info("received initial membership " + currentServerSet);
+            } else {
+              logChange(Level.INFO, currentServerSet);
+            }
+          }
+        }
+        serverSet = currentServerSet;
+        monitor.onChange(serverSet);
+      }
+    }
+
+    private void logChange(Level level, ImmutableSet<ServiceInstance> newServerSet) {
+      StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
+      if (serverSet.size() != newServerSet.size()) {
+        message.append("from ").append(serverSet.size())
+            .append(" members to ").append(newServerSet.size());
+      }
+
+      Joiner joiner = Joiner.on("\n\t\t");
+
+      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
+      if (!left.isEmpty()) {
+        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
+      }
+
+      SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
+      if (!joined.isEmpty()) {
+        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
+      }
+
+      LOG.log(level, message.toString());
+    }
+  }
+
+  private static class EndpointSchema {
+    final String host;
+    final Integer port;
+
+    EndpointSchema(Endpoint endpoint) {
+      Preconditions.checkNotNull(endpoint);
+      this.host = endpoint.getHost();
+      this.port = endpoint.getPort();
+    }
+
+    String getHost() {
+      return host;
+    }
+
+    Integer getPort() {
+      return port;
+    }
+  }
+
+  private static class ServiceInstanceSchema {
+    final EndpointSchema serviceEndpoint;
+    final Map<String, EndpointSchema> additionalEndpoints;
+    final Status status;
+    final Integer shard;
+
+    ServiceInstanceSchema(ServiceInstance instance) {
+      this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+      if (instance.getAdditionalEndpoints() != null) {
+        this.additionalEndpoints = Maps.transformValues(
+            instance.getAdditionalEndpoints(),
+            new Function<Endpoint, EndpointSchema>() {
+              @Override public EndpointSchema apply(Endpoint endpoint) {
+                return new EndpointSchema(endpoint);
+              }
+            }
+        );
+      } else {
+        this.additionalEndpoints = Maps.newHashMap();
+      }
+      this.status  = instance.getStatus();
+      this.shard = instance.isSetShard() ? instance.getShard() : null;
+    }
+
+    EndpointSchema getServiceEndpoint() {
+      return serviceEndpoint;
+    }
+
+    Map<String, EndpointSchema> getAdditionalEndpoints() {
+      return additionalEndpoints;
+    }
+
+    Status getStatus() {
+      return status;
+    }
+
+    Integer getShard() {
+      return shard;
+    }
+  }
+
+  /**
+   * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the
+   * __isset_bit_vector internal thrift struct field that tracks primitive types.
+   */
+  private static class AdaptedJsonCodec implements Codec<ServiceInstance> {
+    private static final Charset ENCODING = Charsets.UTF_8;
+    private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class;
+    private final Gson gson = new Gson();
+
+    @Override
+    public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
+      Writer w = new OutputStreamWriter(sink, ENCODING);
+      gson.toJson(new ServiceInstanceSchema(instance), CLASS, w);
+      w.flush();
+    }
+
+    @Override
+    public ServiceInstance deserialize(InputStream source) throws IOException {
+      ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS);
+      Endpoint primary = new Endpoint(
+          output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort());
+      Map<String, Endpoint> additional = Maps.transformValues(
+          output.getAdditionalEndpoints(),
+          new Function<EndpointSchema, Endpoint>() {
+            @Override public Endpoint apply(EndpointSchema endpoint) {
+              return new Endpoint(endpoint.getHost(), endpoint.getPort());
+            }
+          }
+      );
+      ServiceInstance instance =
+          new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus());
+      if (output.getShard() != null) {
+        instance.setShard(output.getShard());
+      }
+      return instance;
+    }
+  }
+
+  private static Codec<ServiceInstance> createCodec(final boolean useJsonEncoding) {
+    final Codec<ServiceInstance> json = new AdaptedJsonCodec();
+    final Codec<ServiceInstance> thrift =
+        ThriftCodec.create(ServiceInstance.class, ThriftCodec.BINARY_PROTOCOL);
+    final Predicate<byte[]> recognizer = new Predicate<byte[]>() {
+      public boolean apply(byte[] input) {
+        return (input.length > 1 && input[0] == '{' && input[1] == '\"') == useJsonEncoding;
+      }
+    };
+
+    if (useJsonEncoding) {
+      return CompatibilityCodec.create(json, thrift, 2, recognizer);
+    }
+    return CompatibilityCodec.create(thrift, json, 2, recognizer);
+  }
+
+  /**
+   * Creates a codec for {@link ServiceInstance} objects that uses Thrift binary encoding, and can
+   * decode both Thrift and JSON encodings.
+   *
+   * @return a new codec instance.
+   */
+  public static Codec<ServiceInstance> createThriftCodec() {
+    return createCodec(false);
+  }
+
+  /**
+   * Creates a codec for {@link ServiceInstance} objects that uses JSON encoding, and can decode
+   * both Thrift and JSON encodings.
+   *
+   * @return a new codec instance.
+   */
+  public static Codec<ServiceInstance> createJsonCodec() {
+    return createCodec(true);
+  }
+
+  /**
+   * Returns a codec for {@link ServiceInstance} objects that uses either the Thrift or the JSON
+   * encoding, depending on whether the command line argument <tt>serverset_json_encofing</tt> is
+   * set to <tt>true</tt>, and can decode both Thrift and JSON encodings.
+   *
+   * @return a new codec instance.
+   */
+  public static Codec<ServiceInstance> createDefaultCodec() {
+    return createCodec(ENCODE_JSON.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
new file mode 100644
index 0000000..370ab6b
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
@@ -0,0 +1,135 @@
+package com.twitter.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.common.base.Function;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.io.Codec;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+/**
+ * Common ServerSet related functions
+ */
+public class ServerSets {
+
+  private ServerSets() {
+    // Utility class.
+  }
+
+  /**
+   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
+   */
+  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
+      new Function<InetSocketAddress, Endpoint>() {
+        @Override public Endpoint apply(InetSocketAddress address) {
+          return ServerSets.toEndpoint(address);
+        }
+      };
+
+  /**
+   * Creates a server set that registers at a single path applying the given ACL to all nodes
+   * created in the path.
+   *
+   * @param zkClient ZooKeeper client to register with.
+   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
+   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, java.util.Set)
+   * @return A server set that registers at {@code zkPath}.
+   */
+  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
+    return create(zkClient, acl, ImmutableSet.of(zkPath));
+  }
+
+  /**
+   * Creates a server set that registers at one or multiple paths applying the given ACL to all
+   * nodes created in the paths.
+   *
+   * @param zkClient ZooKeeper client to register with.
+   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
+   * @param zkPaths Paths to register at, must be non-empty.
+   * @return A server set that registers at the given {@code zkPath}s.
+   */
+  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, Set<String> zkPaths) {
+    Preconditions.checkNotNull(zkClient);
+    MorePreconditions.checkNotBlank(acl);
+    MorePreconditions.checkNotBlank(zkPaths);
+
+    if (zkPaths.size() == 1) {
+      return new ServerSetImpl(zkClient, acl, Iterables.getOnlyElement(zkPaths));
+    } else {
+      ImmutableList.Builder<ServerSet> builder = ImmutableList.builder();
+      for (String path : zkPaths) {
+        builder.add(new ServerSetImpl(zkClient, acl, path));
+      }
+      return new CompoundServerSet(builder.build());
+    }
+  }
+
+  /**
+   * Returns a serialized Thrift service instance object, with given endpoints and codec.
+   *
+   * @param serviceInstance the Thrift service instance object to be serialized
+   * @param codec the codec to use to serialize a Thrift service instance object
+   * @return byte array that contains a serialized Thrift service instance
+   */
+  public static byte[] serializeServiceInstance(
+      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    codec.serialize(serviceInstance, output);
+    return output.toByteArray();
+  }
+
+  /**
+   * Serializes a service instance based on endpoints.
+   * @see #serializeServiceInstance(ServiceInstance, Codec)
+   *
+   * @param address the target address of the service instance
+   * @param additionalEndpoints additional endpoints of the service instance
+   * @param status service status
+   */
+  public static byte[] serializeServiceInstance(
+      InetSocketAddress address,
+      Map<String, Endpoint> additionalEndpoints,
+      Status status,
+      Codec<ServiceInstance> codec) throws IOException {
+
+    ServiceInstance serviceInstance =
+        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
+    return serializeServiceInstance(serviceInstance, codec);
+  }
+
+  /**
+   * Creates a service instance object deserialized from byte array.
+   *
+   * @param data the byte array contains a serialized Thrift service instance
+   * @param codec the codec to use to deserialize the byte array
+   */
+  public static ServiceInstance deserializeServiceInstance(
+      byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+    return codec.deserialize(new ByteArrayInputStream(data));
+  }
+
+  /**
+   * Creates an endpoint for the given InetSocketAddress.
+   *
+   * @param address the target address to create the endpoint for
+   */
+  public static Endpoint toEndpoint(InetSocketAddress address) {
+    return new Endpoint(address.getHostName(), address.getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
new file mode 100644
index 0000000..00b8b53
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
@@ -0,0 +1,318 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.zookeeper.Candidate.Leader;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.ServerSet.EndpointStatus;
+import com.twitter.common.zookeeper.ServerSet.UpdateException;
+import com.twitter.thrift.Status;
+
+/**
+ * A service that uses master election to only allow a single instance of the server to join
+ * the {@link ServerSet} at a time.
+ */
+public class SingletonService {
+  private static final Logger LOG = Logger.getLogger(SingletonService.class.getName());
+
+  @VisibleForTesting
+  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+  /**
+   * Creates a candidate that can be combined with an existing server set to form a singleton
+   * service using {@link #SingletonService(ServerSet, Candidate)}.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param servicePath The path where service nodes live.
+   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+   * @return A candidate that can be housed with a standard server set under a single zk path.
+   */
+  public static Candidate createSingletonCandidate(
+      ZooKeeperClient zkClient,
+      String servicePath,
+      Iterable<ACL> acl) {
+
+    return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+  }
+
+  private final ServerSet serverSet;
+  private final Candidate candidate;
+
+  /**
+   * Equivalent to {@link #SingletonService(ZooKeeperClient, String, Iterable)} with a default
+   * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
+   */
+  public SingletonService(ZooKeeperClient zkClient, String servicePath) {
+    this(zkClient, servicePath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  /**
+   * Creates a new singleton service, identified by {@code servicePath}.  All nodes related to the
+   * service (for both leader election and service registration) will live under the path and each
+   * node will be created with the supplied {@code acl}. Internally, two ZooKeeper {@code Group}s
+   * are used to manage a singleton service - one for leader election, and another for the
+   * {@code ServerSet} where the leader's endpoints are registered.  Leadership election should
+   * guarantee that at most one instance will ever exist in the ServerSet at once.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param servicePath The path where service nodes live.
+   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+   */
+  public SingletonService(ZooKeeperClient zkClient, String servicePath, Iterable<ACL> acl) {
+    this(
+        new ServerSetImpl(zkClient, new Group(zkClient, acl, servicePath)),
+        createSingletonCandidate(zkClient, servicePath, acl));
+  }
+
+  /**
+   * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
+   * advertises itself in the given server set once elected.
+   *
+   * @param serverSet The server set to advertise in on election.
+   * @param candidate The candidacy to use to vie for election.
+   */
+  public SingletonService(ServerSet serverSet, Candidate candidate) {
+    this.serverSet = Preconditions.checkNotNull(serverSet);
+    this.candidate = Preconditions.checkNotNull(candidate);
+  }
+
+  /**
+   * Attempts to lead the singleton service.
+   *
+   * @param endpoint The primary endpoint to register as a leader candidate in the service.
+   * @param additionalEndpoints Additional endpoints that are available on the host.
+   * @param status deprecated, will be ignored entirely
+   * @param listener Handler to call when the candidate is elected or defeated.
+   * @throws Group.WatchException If there was a problem watching the ZooKeeper group.
+   * @throws Group.JoinException If there was a problem joining the ZooKeeper group.
+   * @throws InterruptedException If the thread watching/joining the group was interrupted.
+   * @deprecated The status field is deprecated. Please use
+   *            {@link #lead(InetSocketAddress, Map, LeadershipListener)}
+   */
+  @Deprecated
+  public void lead(final InetSocketAddress endpoint,
+                   final Map<String, InetSocketAddress> additionalEndpoints,
+                   final Status status,
+                   final LeadershipListener listener)
+                   throws Group.WatchException, Group.JoinException, InterruptedException {
+
+    if (status != Status.ALIVE) {
+      LOG.severe("******************************************************************************");
+      LOG.severe("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
+      LOG.severe("JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status);
+      LOG.severe("******************************************************************************");
+    } else {
+      LOG.warning("******************************************************************************");
+      LOG.warning("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
+      LOG.warning("Please use SingletonService.lead(InetSocketAddress, Map, LeadershipListener)");
+      LOG.warning("******************************************************************************");
+    }
+
+    lead(endpoint, additionalEndpoints, listener);
+  }
+
+  /**
+   * Attempts to lead the singleton service.
+   *
+   * @param endpoint The primary endpoint to register as a leader candidate in the service.
+   * @param additionalEndpoints Additional endpoints that are available on the host.
+   * @param listener Handler to call when the candidate is elected or defeated.
+   * @throws Group.WatchException If there was a problem watching the ZooKeeper group.
+   * @throws Group.JoinException If there was a problem joining the ZooKeeper group.
+   * @throws InterruptedException If the thread watching/joining the group was interrupted.
+   */
+  public void lead(final InetSocketAddress endpoint,
+                   final Map<String, InetSocketAddress> additionalEndpoints,
+                   final LeadershipListener listener)
+                   throws Group.WatchException, Group.JoinException, InterruptedException {
+
+    Preconditions.checkNotNull(listener);
+
+    candidate.offerLeadership(new Leader() {
+      private EndpointStatus endpointStatus = null;
+      @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
+        listener.onLeading(new LeaderControl() {
+          EndpointStatus endpointStatus = null;
+          final AtomicBoolean left = new AtomicBoolean(false);
+
+          // Methods are synchronized to prevent simultaneous invocations.
+          @Override public synchronized void advertise()
+              throws JoinException, InterruptedException {
+
+            Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
+            Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
+            endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+          }
+
+          @Override public synchronized void leave() throws UpdateException, JoinException {
+            Preconditions.checkState(left.compareAndSet(false, true),
+                "Cannot leave more than once.");
+            if (endpointStatus != null) {
+              endpointStatus.leave();
+            }
+            abdicate.execute();
+          }
+        });
+      }
+
+      @Override public void onDefeated() {
+        listener.onDefeated(endpointStatus);
+      }
+    });
+  }
+
+  /**
+   * A listener to be notified of changes in the leadership status.
+   * Implementers should be careful to avoid blocking operations in these callbacks.
+   */
+  public interface LeadershipListener {
+
+    /**
+     * Notifies the listener that is is current leader.
+     *
+     * @param control A controller handle to advertise and/or leave advertised presence.
+     */
+    public void onLeading(LeaderControl control);
+
+    /**
+     * Notifies the listener that it is no longer leader.  The leader should take this opportunity
+     * to remove its advertisement gracefully.
+     *
+     * @param status A handle on the endpoint status for the advertised leader.
+     */
+    public void onDefeated(@Nullable EndpointStatus status);
+  }
+
+  /**
+   * A leadership listener that decorates another listener by automatically defeating a
+   * leader that has dropped its connection to ZooKeeper.
+   * Note that the decision to use this over session-based mutual exclusion should not be taken
+   * lightly.  Any momentary connection loss due to a flaky network or a ZooKeeper server process
+   * exit will cause a leader to abort.
+   */
+  public static class DefeatOnDisconnectLeader implements LeadershipListener {
+
+    private final LeadershipListener wrapped;
+    private Optional<LeaderControl> maybeControl = Optional.absent();
+
+    /**
+     * Creates a new leadership listener that will delegate calls to the wrapped listener, and
+     * invoke {@link #onDefeated(EndpointStatus)} if a ZooKeeper disconnect is observed while
+     * leading.
+     *
+     * @param zkClient The ZooKeeper client to watch for disconnect events.
+     * @param wrapped The leadership listener to wrap.
+     */
+    public DefeatOnDisconnectLeader(ZooKeeperClient zkClient, LeadershipListener wrapped) {
+      this.wrapped = Preconditions.checkNotNull(wrapped);
+
+      zkClient.register(new Watcher() {
+        @Override public void process(WatchedEvent event) {
+          if ((event.getType() == EventType.None)
+              && (event.getState() == KeeperState.Disconnected)) {
+            disconnected();
+          }
+        }
+      });
+    }
+
+    private synchronized void disconnected() {
+      if (maybeControl.isPresent()) {
+        LOG.warning("Disconnected from ZooKeeper while leading, committing suicide.");
+        try {
+          wrapped.onDefeated(null);
+          maybeControl.get().leave();
+        } catch (UpdateException e) {
+          LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
+        } catch (JoinException e) {
+          LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
+        } finally {
+          setControl(null);
+        }
+      } else {
+        LOG.info("Disconnected from ZooKeeper, but that's fine because I'm not the leader.");
+      }
+    }
+
+    private synchronized void setControl(@Nullable LeaderControl control) {
+      this.maybeControl = Optional.fromNullable(control);
+    }
+
+    @Override public void onLeading(final LeaderControl control) {
+      setControl(control);
+      wrapped.onLeading(new LeaderControl() {
+        @Override public void advertise() throws JoinException, InterruptedException {
+          control.advertise();
+        }
+
+        @Override public void leave() throws UpdateException, JoinException {
+          setControl(null);
+          control.leave();
+        }
+      });
+    }
+
+    @Override public void onDefeated(@Nullable EndpointStatus status) {
+      setControl(null);
+      wrapped.onDefeated(status);
+    }
+  }
+
+  /**
+   * A controller for the state of the leader.  This will be provided to the leader upon election,
+   * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and
+   * terminate leadership at will.
+   */
+  public interface LeaderControl {
+
+    /**
+     * Advertises the leader's server presence to clients.
+     *
+     * @throws JoinException If there was an error advertising.
+     * @throws InterruptedException If interrupted while advertising.
+     */
+    void advertise() throws JoinException, InterruptedException;
+
+    /**
+     * Leaves candidacy for leadership, removing advertised server presence if applicable.
+     *
+     * @throws UpdateException If the leader's status could not be updated.
+     * @throws JoinException If there was an error abdicating from leader election.
+     */
+    void leave() throws UpdateException, JoinException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
new file mode 100644
index 0000000..c10bd86
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
@@ -0,0 +1,132 @@
+package com.twitter.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Commands;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+/**
+ * A server set that represents a fixed set of hosts.
+ * This may be composed under {@link CompoundServerSet} to ensure a minimum set of hosts is
+ * present.
+ * A static server set does not support joining, but will allow normal join calls and status update
+ * calls to be made.
+ */
+public class StaticServerSet implements ServerSet {
+
+  private static final Logger LOG = Logger.getLogger(StaticServerSet.class.getName());
+
+  private static final Function<Endpoint, ServiceInstance> ENDPOINT_TO_INSTANCE =
+      new Function<Endpoint, ServiceInstance>() {
+        @Override public ServiceInstance apply(Endpoint endpoint) {
+          return new ServiceInstance(endpoint, ImmutableMap.<String, Endpoint>of(), Status.ALIVE);
+        }
+      };
+
+  private final ImmutableSet<ServiceInstance> hosts;
+
+  /**
+   * Creates a static server set that will reply to monitor calls immediately and exactly once with
+   * the provided service instances.
+   *
+   * @param hosts Hosts in the static set.
+   */
+  public StaticServerSet(Set<ServiceInstance> hosts) {
+    this.hosts = ImmutableSet.copyOf(hosts);
+  }
+
+  /**
+   * Creates a static server set containing the provided endpoints (and no auxiliary ports) which
+   * will all be in the {@link Status#ALIVE} state.
+   *
+   * @param endpoints Endpoints in the static set.
+   * @return A static server set that will advertise the provided endpoints.
+   */
+  public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) {
+    return new StaticServerSet(
+        ImmutableSet.copyOf(Iterables.transform(endpoints, ENDPOINT_TO_INSTANCE)));
+  }
+
+  private EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> auxEndpoints,
+      Optional<Integer> shardId) {
+
+    LOG.warning("Attempt to join fixed server set ignored.");
+    ServiceInstance joining = new ServiceInstance(
+        ServerSets.toEndpoint(endpoint),
+        Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT),
+        Status.ALIVE);
+    if (shardId.isPresent()) {
+      joining.setShard(shardId.get());
+    }
+    if (!hosts.contains(joining)) {
+      LOG.log(Level.SEVERE,
+          "Joining instance " + joining + " does not match any member of the static set.");
+    }
+
+    return new EndpointStatus() {
+      @Override public void leave() throws UpdateException {
+        LOG.warning("Attempt to adjust state of fixed server set ignored.");
+      }
+
+      @Override public void update(Status status) throws UpdateException {
+        LOG.warning("Attempt to adjust state of fixed server set ignored.");
+      }
+    };
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> auxEndpoints,
+      Status status) {
+
+    LOG.warning("This method is deprecated. Please do not specify a status field.");
+    return join(endpoint, auxEndpoints, Optional.<Integer>absent());
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> auxEndpoints) {
+
+    LOG.warning("Joining a ServerSet without a shard ID is deprecated and will soon break.");
+    return join(endpoint, auxEndpoints, Optional.<Integer>absent());
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> auxEndpoints,
+      int shardId) throws JoinException, InterruptedException {
+
+    return join(endpoint, auxEndpoints, Optional.of(shardId));
+  }
+
+  @Override
+  public Command watch(HostChangeMonitor<ServiceInstance> monitor) {
+    monitor.onChange(hosts);
+    return Commands.NOOP;
+  }
+
+  @Override
+  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+    watch(monitor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java
new file mode 100644
index 0000000..a051611
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java
@@ -0,0 +1,496 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.net.InetSocketAddressHelper;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * Manages a connection to a ZooKeeper cluster.
+ */
+public class ZooKeeperClient {
+
+  /**
+   * Indicates an error connecting to a zookeeper cluster.
+   */
+  public class ZooKeeperConnectionException extends Exception {
+    public ZooKeeperConnectionException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Encapsulates a user's credentials and has the ability to authenticate them through a
+   * {@link ZooKeeper} client.
+   */
+  public interface Credentials {
+
+    /**
+     * A set of {@code Credentials} that performs no authentication.
+     */
+    Credentials NONE = new Credentials() {
+      @Override public void authenticate(ZooKeeper zooKeeper) {
+        // noop
+      }
+
+      @Override public String scheme() {
+        return null;
+      }
+
+      @Override public byte[] authToken() {
+        return null;
+      }
+    };
+
+    /**
+     * Authenticates these credentials against the given {@code ZooKeeper} client.
+     *
+     * @param zooKeeper the client to authenticate
+     */
+    void authenticate(ZooKeeper zooKeeper);
+
+    /**
+     * Returns the authentication scheme these credentials are for.
+     *
+     * @return the scheme these credentials are for or {@code null} if no authentication is
+     *     intended.
+     */
+    @Nullable
+    String scheme();
+
+    /**
+     * Returns the authentication token.
+     *
+     * @return the authentication token or {@code null} if no authentication is intended.
+     */
+    @Nullable
+    byte[] authToken();
+  }
+
+  /**
+   * Creates a set of credentials for the zoo keeper digest authentication mechanism.
+   *
+   * @param username the username to authenticate with
+   * @param password the password to authenticate with
+   * @return a set of credentials that can be used to authenticate the zoo keeper client
+   */
+  public static Credentials digestCredentials(String username, String password) {
+    MorePreconditions.checkNotBlank(username);
+    Preconditions.checkNotNull(password);
+
+    // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
+    // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
+    // Consider writing and installing a version of DigestAuthenticationProvider that controls its
+    // Charset explicitly.
+    return credentials("digest", (username + ":" + password).getBytes());
+  }
+
+  /**
+   * Creates a set of credentials for the given authentication {@code scheme}.
+   *
+   * @param scheme the scheme to authenticate with
+   * @param authToken the authentication token
+   * @return a set of credentials that can be used to authenticate the zoo keeper client
+   */
+  public static Credentials credentials(final String scheme, final byte[] authToken) {
+    MorePreconditions.checkNotBlank(scheme);
+    Preconditions.checkNotNull(authToken);
+
+    return new Credentials() {
+      @Override public void authenticate(ZooKeeper zooKeeper) {
+        zooKeeper.addAuthInfo(scheme, authToken);
+      }
+
+      @Override public String scheme() {
+        return scheme;
+      }
+
+      @Override public byte[] authToken() {
+        return authToken;
+      }
+
+      @Override public boolean equals(Object o) {
+        if (!(o instanceof Credentials)) {
+          return false;
+        }
+
+        Credentials other = (Credentials) o;
+        return new EqualsBuilder()
+            .append(scheme, other.scheme())
+            .append(authToken, other.authToken())
+            .isEquals();
+      }
+
+      @Override public int hashCode() {
+        return Objects.hashCode(scheme, authToken);
+      }
+    };
+  }
+
+  private final class SessionState {
+    private final long sessionId;
+    private final byte[] sessionPasswd;
+
+    private SessionState(long sessionId, byte[] sessionPasswd) {
+      this.sessionId = sessionId;
+      this.sessionPasswd = sessionPasswd;
+    }
+  }
+
+  private static final Logger LOG = Logger.getLogger(ZooKeeperClient.class.getName());
+
+  private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS);
+
+  private final int sessionTimeoutMs;
+  private final Credentials credentials;
+  private final String zooKeeperServers;
+  // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
+  // made from within long synchronized blocks.
+  private volatile ZooKeeper zooKeeper;
+  private SessionState sessionState;
+
+  private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
+  private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>();
+
+  private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
+      InetSocketAddress... addresses) {
+    return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get()}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param zooKeeperServer the first, required ZK server
+   * @param zooKeeperServers any additional servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer,
+      InetSocketAddress... zooKeeperServers) {
+    this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
+      Iterable<InetSocketAddress> zooKeeperServers) {
+    this(sessionTimeout, Credentials.NONE, Optional.<String> absent(), zooKeeperServers);
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get()}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param zooKeeperServer the first, required ZK server
+   * @param zooKeeperServers any additional servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+      InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) {
+    this(sessionTimeout, credentials, Optional.<String> absent(), combine(zooKeeperServer, zooKeeperServers));
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+      Iterable<InetSocketAddress> zooKeeperServers) {
+        this(sessionTimeout, credentials, Optional.<String> absent(), zooKeeperServers);
+      }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the first call to
+   * {@link #get}.  All successful connections will be authenticated with the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param chrootPath an optional chroot path
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+      Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) {
+    this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
+    this.credentials = Preconditions.checkNotNull(credentials);
+
+    if (chrootPath.isPresent()) {
+      PathUtils.validatePath(chrootPath.get());
+    }
+
+    Preconditions.checkNotNull(zooKeeperServers);
+    Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
+        "Must present at least 1 ZK server");
+
+    Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
+      @Override
+      public void run() {
+        while (true) {
+          try {
+            WatchedEvent event = eventQueue.take();
+            for (Watcher watcher : watchers) {
+              watcher.process(event);
+            }
+          } catch (InterruptedException e) { /* ignore */ }
+        }
+      }
+    };
+    watcherProcessor.setDaemon(true);
+    watcherProcessor.start();
+
+    Iterable<String> servers =
+        Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
+            InetSocketAddressHelper.INET_TO_STR);
+    this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or(""));
+  }
+
+  /**
+   * Returns true if this client has non-empty credentials set.  For example, returns {@code false}
+   * if this client was constructed with {@link Credentials#NONE}.
+   *
+   * @return {@code true} if this client is configured with non-empty credentials.
+   */
+  public boolean hasCredentials() {
+    return !Strings.isNullOrEmpty(credentials.scheme()) && (credentials.authToken() != null);
+  }
+
+  /**
+   * Returns the current active ZK connection or establishes a new one if none has yet been
+   * established or a previous connection was disconnected or had its session time out.  This method
+   * will attempt to re-use sessions when possible.  Equivalent to:
+   * <pre>get(Amount.of(0L, ...)</pre>.
+   *
+   * @return a connected ZooKeeper client
+   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+   * @throws InterruptedException if interrupted while waiting for a connection to be established
+   */
+  public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException {
+    try {
+      return get(WAIT_FOREVER);
+    } catch (TimeoutException e) {
+      InterruptedException interruptedException =
+          new InterruptedException("Got an unexpected TimeoutException for 0 wait");
+      interruptedException.initCause(e);
+      throw interruptedException;
+    }
+  }
+
+  /**
+   * Returns the current active ZK connection or establishes a new one if none has yet been
+   * established or a previous connection was disconnected or had its session time out.  This
+   * method will attempt to re-use sessions when possible.
+   *
+   * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK
+   *     cluster to be established; 0 to wait forever
+   * @return a connected ZooKeeper client
+   * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+   * @throws InterruptedException if interrupted while waiting for a connection to be established
+   * @throws TimeoutException if a connection could not be established within the configured
+   *     session timeout
+   */
+  public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
+      throws ZooKeeperConnectionException, InterruptedException, TimeoutException {
+
+    if (zooKeeper == null) {
+      final CountDownLatch connected = new CountDownLatch(1);
+      Watcher watcher = new Watcher() {
+        @Override public void process(WatchedEvent event) {
+          switch (event.getType()) {
+            // Guard the None type since this watch may be used as the default watch on calls by
+            // the client outside our control.
+            case None:
+              switch (event.getState()) {
+                case Expired:
+                  LOG.info("Zookeeper session expired. Event: " + event);
+                  close();
+                  break;
+                case SyncConnected:
+                  connected.countDown();
+                  break;
+              }
+          }
+
+          eventQueue.offer(event);
+        }
+      };
+
+      try {
+        zooKeeper = (sessionState != null)
+          ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId,
+            sessionState.sessionPasswd)
+          : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
+      } catch (IOException e) {
+        throw new ZooKeeperConnectionException(
+            "Problem connecting to servers: " + zooKeeperServers, e);
+      }
+
+      if (connectionTimeout.getValue() > 0) {
+        if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+          close();
+          throw new TimeoutException("Timed out waiting for a ZK connection after "
+                                     + connectionTimeout);
+        }
+      } else {
+        try {
+          connected.await();
+        } catch (InterruptedException ex) {
+          LOG.info("Interrupted while waiting to connect to zooKeeper");
+          close();
+          throw ex;
+        }
+      }
+      credentials.authenticate(zooKeeper);
+
+      sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
+    }
+    return zooKeeper;
+  }
+
+  /**
+   * Clients that need to re-establish state after session expiration can register an
+   * {@code onExpired} command to execute.
+   *
+   * @param onExpired the {@code Command} to register
+   * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
+   *     removal.
+   */
+  public Watcher registerExpirationHandler(final Command onExpired) {
+    Watcher watcher = new Watcher() {
+      @Override public void process(WatchedEvent event) {
+        if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+          onExpired.execute();
+        }
+      }
+    };
+    register(watcher);
+    return watcher;
+  }
+
+  /**
+   * Clients that need to register a top-level {@code Watcher} should do so using this method.  The
+   * registered {@code watcher} will remain registered across re-connects and session expiration
+   * events.
+   *
+   * @param watcher the {@code Watcher to register}
+   */
+  public void register(Watcher watcher) {
+    watchers.add(watcher);
+  }
+
+  /**
+   * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
+   * registered.
+   *
+   * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
+   * @return whether the given {@code Watcher} was found and removed from the active set
+   */
+  public boolean unregister(Watcher watcher) {
+    return watchers.remove(watcher);
+  }
+
+  /**
+   * Checks to see if the client might reasonably re-try an operation given the exception thrown
+   * while attempting it.  If the ZooKeeper session should be expired to enable the re-try to
+   * succeed this method will expire it as a side-effect.
+   *
+   * @param e the exception to test
+   * @return true if a retry can be attempted
+   */
+  public boolean shouldRetry(KeeperException e) {
+    if (e instanceof SessionExpiredException) {
+      close();
+    }
+    return ZooKeeperUtils.isRetryable(e);
+  }
+
+  /**
+   * Closes the current connection if any expiring the current ZooKeeper session.  Any subsequent
+   * calls to this method will no-op until the next successful {@link #get}.
+   */
+  public synchronized void close() {
+    if (zooKeeper != null) {
+      try {
+        zooKeeper.close();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warning("Interrupted trying to close zooKeeper");
+      } finally {
+        zooKeeper = null;
+        sessionState = null;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  synchronized boolean isClosed() {
+    return zooKeeper == null;
+  }
+
+  @VisibleForTesting
+  ZooKeeper getZooKeeperClientForTests() {
+    return zooKeeper;
+  }
+}