You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/25 20:19:32 UTC
[18/37] aurora git commit: Import of Twitter Commons.
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
new file mode 100644
index 0000000..646fab9
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
@@ -0,0 +1,175 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Ordering;
+import com.twitter.common.zookeeper.Group.GroupChangeListener;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.Membership;
+import com.twitter.common.zookeeper.Group.UpdateException;
+import com.twitter.common.zookeeper.Group.WatchException;
+import org.apache.zookeeper.data.ACL;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * A distributed mechanism for eventually arriving at an evenly partitioned space of long values.
+ * A typical usage would have a client on each of several hosts joining a logical partition (a
+ * "partition group") that represents some shared work. Clients could then process a subset of a
+ * full body of work by testing any given item of work with their partition filter.
+ *
+ * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1
+ * partition as explained in {@link #join()}.
+ *
+ * @author John Sirois
+ */
+public class Partitioner {
+
+ private static final Logger LOG = Logger.getLogger(Partitioner.class.getName());
+
+ private volatile int groupSize;
+ private volatile int groupIndex;
+ private final Group group;
+
+ /**
+ * Constructs a representation of a partition group but does not join it. Note that the partition
+ * group path will be created as a persistent zookeeper path if it does not already exist.
+ *
+ * @param zkClient a client to use for joining the partition group and watching its membership
+ * @param acl the acl for this partition group
+ * @param path a zookeeper path that represents the partition group
+ */
+ public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) {
+ group = new Group(zkClient, acl, path);
+ }
+
+ @VisibleForTesting
+ int getGroupSize() {
+ return groupSize;
+ }
+
+ /**
+ * Represents a slice of a partition group. The partition is dynamic and will adjust its size as
+ * members join and leave its partition group.
+ */
+ public abstract static class Partition implements Predicate<Long>, Membership {
+
+ /**
+ * Returns {@code true} if the given {@code value} is a member of this partition at this time.
+ */
+ public abstract boolean isMember(long value);
+
+ /**
+ * Gets number of members in the group at this time.
+ *
+ * @return number of members in the ZK group at this time.
+ */
+ public abstract int getNumPartitions();
+
+ /**
+ * Evaluates partition membership based on the given {@code value}'s hash code. If the value
+ * is null it is never a member of a partition.
+ */
+ boolean isMember(Object value) {
+ return (value != null) && isMember(value.hashCode());
+ }
+
+ /**
+ * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing
+ * overhead.
+ */
+ @Override
+ public boolean apply(@Nullable Long input) {
+ return (input != null) && isMember(input);
+ }
+ }
+
+ /**
+ * Attempts to join the partition group and claim a slice. When successful, a predicate is
+ * returned that can be used to test whether or not an item belongs to this partition. The
+ * predicate is dynamic such that as the group is further partitioned or partitions merge the
+ * predicate will claim a narrower or wider swath of the partition space respectively. Partition
+ * creation and merging is not instantaneous and clients should expect independent partitions to
+ * claim ownership of some items when partition membership is in flux. It is only in the steady
+ * state that a client should expect independent partitions to divide the partition space evenly
+ * and without overlap.
+ *
+ * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation.
+ *
+ * @return the partition representing the slice of the partition group this member can claim
+ * @throws JoinException if there was a problem joining the partition group
+ * @throws InterruptedException if interrupted while waiting to join the partition group
+ */
+ public final Partition join() throws JoinException, InterruptedException {
+ final Membership membership = group.join();
+ try {
+ group.watch(createGroupChangeListener(membership));
+ } catch (WatchException e) {
+ membership.cancel();
+ throw new JoinException("Problem establishing watch on group after joining it", e);
+ }
+ return new Partition() {
+ @Override public boolean isMember(long value) {
+ return (value % groupSize) == groupIndex;
+ }
+
+ @Override public int getNumPartitions() {
+ return groupSize;
+ }
+
+ @Override public String getGroupPath() {
+ return membership.getGroupPath();
+ }
+
+ @Override public String getMemberId() {
+ return membership.getMemberId();
+ }
+
+ @Override public String getMemberPath() {
+ return membership.getMemberPath();
+ }
+
+ @Override public byte[] updateMemberData() throws UpdateException {
+ return membership.updateMemberData();
+ }
+
+ @Override public void cancel() throws JoinException {
+ membership.cancel();
+ }
+ };
+ }
+
+ @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) {
+ return new GroupChangeListener() {
+ @Override public void onGroupChange(Iterable<String> memberIds) {
+ List<String> members = Ordering.natural().sortedCopy(memberIds);
+ int newSize = members.size();
+ int newIndex = members.indexOf(membership.getMemberId());
+
+ LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]",
+ membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex));
+
+ groupSize = newSize;
+ groupIndex = newIndex;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
new file mode 100644
index 0000000..b6a0686
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
@@ -0,0 +1,117 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * A logical set of servers registered in ZooKeeper. Intended to be used by both servers in a
+ * common service and their clients.
+ *
+ * TODO(William Farner): Explore decoupling this from thrift.
+ */
+public interface ServerSet extends DynamicHostSet<ServiceInstance> {
+
+ /**
+ * Attempts to join a server set for this logical service group.
+ *
+ * @param endpoint the primary service endpoint
+ * @param additionalEndpoints and additional endpoints keyed by their logical name
+ * @param status the current service status
+ * @return an EndpointStatus object that allows the endpoint to adjust its status
+ * @throws JoinException if there was a problem joining the server set
+ * @throws InterruptedException if interrupted while waiting to join the server set
+ * @deprecated The status field is deprecated. Please use {@link #join(InetSocketAddress, Map)}
+ */
+ @Deprecated
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ Status status) throws JoinException, InterruptedException;
+
+ /**
+ * Attempts to join a server set for this logical service group.
+ *
+ * @param endpoint the primary service endpoint
+ * @param additionalEndpoints and additional endpoints keyed by their logical name
+ * @return an EndpointStatus object that allows the endpoint to adjust its status
+ * @throws JoinException if there was a problem joining the server set
+ * @throws InterruptedException if interrupted while waiting to join the server set
+ */
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints)
+ throws JoinException, InterruptedException;
+
+ /**
+ * Attempts to join a server set for this logical service group.
+ *
+ * @param endpoint the primary service endpoint
+ * @param additionalEndpoints and additional endpoints keyed by their logical name
+ * @param shardId Unique shard identifier for this member of the service.
+ * @return an EndpointStatus object that allows the endpoint to adjust its status
+ * @throws JoinException if there was a problem joining the server set
+ * @throws InterruptedException if interrupted while waiting to join the server set
+ */
+ EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ int shardId) throws JoinException, InterruptedException;
+
+ /**
+ * A handle to a service endpoint's status data that allows updating it to track current events.
+ */
+ public interface EndpointStatus {
+
+ /**
+ * Attempts to update the status of the service endpoint associated with this endpoint. If the
+ * {@code status} is {@link Status#DEAD} then the endpoint will be removed from the server set.
+ *
+ * @param status the current status of the endpoint
+ * @throws UpdateException if there was a problem writing the update
+ * @deprecated Support for mutable status is deprecated. Please use {@link #leave()}
+ */
+ @Deprecated
+ void update(Status status) throws UpdateException;
+
+ /**
+ * Removes the endpoint from the server set.
+ *
+ * @throws UpdateException if there was a problem leaving the ServerSet.
+ */
+ void leave() throws UpdateException;
+ }
+
+ /**
+ * Indicates an error updating a service's status information.
+ */
+ public static class UpdateException extends Exception {
+ public UpdateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UpdateException(String message) {
+ super(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
new file mode 100644
index 0000000..ec6b3f7
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
@@ -0,0 +1,609 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.gson.Gson;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Function;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.io.Codec;
+import com.twitter.common.io.CompatibilityCodec;
+import com.twitter.common.io.ThriftCodec;
+import com.twitter.common.util.BackoffHelper;
+import com.twitter.common.zookeeper.Group.GroupChangeListener;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.Membership;
+import com.twitter.common.zookeeper.Group.WatchException;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper-backed implementation of {@link ServerSet}.
+ */
+public class ServerSetImpl implements ServerSet {
+ private static final Logger LOG = Logger.getLogger(ServerSetImpl.class.getName());
+
+ @CmdLine(name = "serverset_encode_json",
+ help = "If true, use JSON for encoding server set information."
+ + " Defaults to true (use JSON).")
+ private static final Arg<Boolean> ENCODE_JSON = Arg.create(true);
+
+ private final ZooKeeperClient zkClient;
+ private final Group group;
+ private final Codec<ServiceInstance> codec;
+ private final BackoffHelper backoffHelper;
+
+ /**
+ * Creates a new ServerSet using open ZooKeeper node ACLs.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param path the name-service path of the service to connect to
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, String path) {
+ this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
+ }
+
+ /**
+ * Creates a new ServerSet for the given service {@code path}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param acl the ACL to use for creating the persistent group path if it does not already exist
+ * @param path the name-service path of the service to connect to
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+ this(zkClient, new Group(zkClient, acl, path), createDefaultCodec());
+ }
+
+ /**
+ * Creates a new ServerSet using the given service {@code group}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param group the server group
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
+ this(zkClient, group, createDefaultCodec());
+ }
+
+ /**
+ * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param group the server group
+ * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
+ * from a byte array
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
+ this.zkClient = checkNotNull(zkClient);
+ this.group = checkNotNull(group);
+ this.codec = checkNotNull(codec);
+
+ // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
+ backoffHelper = new BackoffHelper();
+ }
+
+ @VisibleForTesting
+ ZooKeeperClient getZkClient() {
+ return zkClient;
+ }
+
+ @Override
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints)
+ throws JoinException, InterruptedException {
+
+ LOG.log(Level.WARNING,
+ "Joining a ServerSet without a shard ID is deprecated and will soon break.");
+ return join(endpoint, additionalEndpoints, Optional.<Integer>absent());
+ }
+
+ @Override
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ int shardId) throws JoinException, InterruptedException {
+
+ return join(endpoint, additionalEndpoints, Optional.of(shardId));
+ }
+
+ private EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ Optional<Integer> shardId) throws JoinException, InterruptedException {
+
+ checkNotNull(endpoint);
+ checkNotNull(additionalEndpoints);
+
+ final MemberStatus memberStatus =
+ new MemberStatus(endpoint, additionalEndpoints, shardId);
+ Supplier<byte[]> serviceInstanceSupplier = new Supplier<byte[]>() {
+ @Override public byte[] get() {
+ return memberStatus.serializeServiceInstance();
+ }
+ };
+ final Membership membership = group.join(serviceInstanceSupplier);
+
+ return new EndpointStatus() {
+ @Override public void update(Status status) throws UpdateException {
+ checkNotNull(status);
+ LOG.warning("This method is deprecated. Please use leave() instead.");
+ if (status == Status.DEAD) {
+ leave();
+ } else {
+ LOG.warning("Status update has been ignored");
+ }
+ }
+
+ @Override public void leave() throws UpdateException {
+ memberStatus.leave(membership);
+ }
+ };
+ }
+
+ @Override
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ Status status) throws JoinException, InterruptedException {
+
+ LOG.warning("This method is deprecated. Please do not specify a status field.");
+ if (status != Status.ALIVE) {
+ LOG.severe("**************************************************************************\n"
+ + "WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.\n"
+ + "JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status
+ + "\n**************************************************************************");
+ }
+ return join(endpoint, additionalEndpoints);
+ }
+
+ @Override
+ public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+ ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
+ try {
+ return serverSetWatcher.watch();
+ } catch (WatchException e) {
+ throw new MonitorException("ZooKeeper watch failed.", e);
+ } catch (InterruptedException e) {
+ throw new MonitorException("Interrupted while watching ZooKeeper.", e);
+ }
+ }
+
+ @Override
+ public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+ LOG.warning("This method is deprecated. Please use watch instead.");
+ watch(monitor);
+ }
+
+ private class MemberStatus {
+ private final InetSocketAddress endpoint;
+ private final Map<String, InetSocketAddress> additionalEndpoints;
+ private final Optional<Integer> shardId;
+
+ private MemberStatus(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ Optional<Integer> shardId) {
+
+ this.endpoint = endpoint;
+ this.additionalEndpoints = additionalEndpoints;
+ this.shardId = shardId;
+ }
+
+ synchronized void leave(Membership membership) throws UpdateException {
+ try {
+ membership.cancel();
+ } catch (JoinException e) {
+ throw new UpdateException(
+ "Failed to auto-cancel group membership on transition to DEAD status", e);
+ }
+ }
+
+ byte[] serializeServiceInstance() {
+ ServiceInstance serviceInstance = new ServiceInstance(
+ ServerSets.toEndpoint(endpoint),
+ Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
+ Status.ALIVE);
+
+ if (shardId.isPresent()) {
+ serviceInstance.setShard(shardId.get());
+ }
+
+ LOG.fine("updating endpoint data to:\n\t" + serviceInstance);
+ try {
+ return ServerSets.serializeServiceInstance(serviceInstance, codec);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unexpected problem serializing thrift struct " +
+ serviceInstance + "to a byte[]", e);
+ }
+ }
+ }
+
+ private static class ServiceInstanceFetchException extends RuntimeException {
+ ServiceInstanceFetchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private static class ServiceInstanceDeletedException extends RuntimeException {
+ ServiceInstanceDeletedException(String path) {
+ super(path);
+ }
+ }
+
+ private class ServerSetWatcher {
+ private final ZooKeeperClient zkClient;
+ private final HostChangeMonitor<ServiceInstance> monitor;
+ @Nullable private ImmutableSet<ServiceInstance> serverSet;
+
+ ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
+ this.zkClient = zkClient;
+ this.monitor = monitor;
+ }
+
+ public Command watch() throws WatchException, InterruptedException {
+ Watcher onExpirationWatcher = zkClient.registerExpirationHandler(new Command() {
+ @Override public void execute() {
+ // Servers may have changed Status while we were disconnected from ZooKeeper, check and
+ // re-register our node watches.
+ rebuildServerSet();
+ }
+ });
+
+ try {
+ return group.watch(new GroupChangeListener() {
+ @Override public void onGroupChange(Iterable<String> memberIds) {
+ notifyGroupChange(memberIds);
+ }
+ });
+ } catch (WatchException e) {
+ zkClient.unregister(onExpirationWatcher);
+ throw e;
+ } catch (InterruptedException e) {
+ zkClient.unregister(onExpirationWatcher);
+ throw e;
+ }
+ }
+
+ private ServiceInstance getServiceInstance(final String nodePath) {
+ try {
+ return backoffHelper.doUntilResult(new Supplier<ServiceInstance>() {
+ @Override public ServiceInstance get() {
+ try {
+ byte[] data = zkClient.get().getData(nodePath, false, null);
+ return ServerSets.deserializeServiceInstance(data, codec);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ServiceInstanceFetchException(
+ "Interrupted updating service data for: " + nodePath, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.log(Level.WARNING,
+ "Temporary error trying to updating service data for: " + nodePath, e);
+ return null;
+ } catch (NoNodeException e) {
+ invalidateNodePath(nodePath);
+ throw new ServiceInstanceDeletedException(nodePath);
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.log(Level.WARNING,
+ "Temporary error trying to update service data for: " + nodePath, e);
+ return null;
+ } else {
+ throw new ServiceInstanceFetchException(
+ "Failed to update service data for: " + nodePath, e);
+ }
+ } catch (IOException e) {
+ throw new ServiceInstanceFetchException(
+ "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
+ }
+ }
+ });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ServiceInstanceFetchException(
+ "Interrupted trying to update service data for: " + nodePath, e);
+ }
+ }
+
+ private final LoadingCache<String, ServiceInstance> servicesByMemberId =
+ CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
+ @Override public ServiceInstance load(String memberId) {
+ return getServiceInstance(group.getMemberPath(memberId));
+ }
+ });
+
+ private void rebuildServerSet() {
+ Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
+ servicesByMemberId.invalidateAll();
+ notifyGroupChange(memberIds);
+ }
+
+ private String invalidateNodePath(String deletedPath) {
+ String memberId = group.getMemberId(deletedPath);
+ servicesByMemberId.invalidate(memberId);
+ return memberId;
+ }
+
+ private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
+ new Function<String, ServiceInstance>() {
+ @Override public ServiceInstance apply(String memberId) {
+ // This get will trigger a fetch
+ try {
+ return servicesByMemberId.getUnchecked(memberId);
+ } catch (UncheckedExecutionException e) {
+ Throwable cause = e.getCause();
+ if (!(cause instanceof ServiceInstanceDeletedException)) {
+ Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
+ throw new IllegalStateException(
+ "Unexpected error fetching member data for: " + memberId, e);
+ }
+ return null;
+ }
+ }
+ };
+
+ private synchronized void notifyGroupChange(Iterable<String> memberIds) {
+ ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
+ Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
+
+ // Ignore no-op state changes except for the 1st when we've seen no group yet.
+ if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
+ SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
+ // Implicit removal from servicesByMemberId.
+ existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
+
+ Iterable<ServiceInstance> serviceInstances = Iterables.filter(
+ Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
+
+ notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
+ }
+ }
+
+ private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
+ // ZK nodes may have changed if there was a session expiry for a server in the server set, but
+ // if the server's status has not changed, we can skip any onChange updates.
+ if (!currentServerSet.equals(serverSet)) {
+ if (currentServerSet.isEmpty()) {
+ LOG.warning("server set empty for path " + group.getPath());
+ } else {
+ if (LOG.isLoggable(Level.INFO)) {
+ if (serverSet == null) {
+ LOG.info("received initial membership " + currentServerSet);
+ } else {
+ logChange(Level.INFO, currentServerSet);
+ }
+ }
+ }
+ serverSet = currentServerSet;
+ monitor.onChange(serverSet);
+ }
+ }
+
+ private void logChange(Level level, ImmutableSet<ServiceInstance> newServerSet) {
+ StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
+ if (serverSet.size() != newServerSet.size()) {
+ message.append("from ").append(serverSet.size())
+ .append(" members to ").append(newServerSet.size());
+ }
+
+ Joiner joiner = Joiner.on("\n\t\t");
+
+ SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
+ if (!left.isEmpty()) {
+ message.append("\n\tleft:\n\t\t").append(joiner.join(left));
+ }
+
+ SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
+ if (!joined.isEmpty()) {
+ message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
+ }
+
+ LOG.log(level, message.toString());
+ }
+ }
+
+ private static class EndpointSchema {
+ final String host;
+ final Integer port;
+
+ EndpointSchema(Endpoint endpoint) {
+ Preconditions.checkNotNull(endpoint);
+ this.host = endpoint.getHost();
+ this.port = endpoint.getPort();
+ }
+
+ String getHost() {
+ return host;
+ }
+
+ Integer getPort() {
+ return port;
+ }
+ }
+
+ private static class ServiceInstanceSchema {
+ final EndpointSchema serviceEndpoint;
+ final Map<String, EndpointSchema> additionalEndpoints;
+ final Status status;
+ final Integer shard;
+
+ ServiceInstanceSchema(ServiceInstance instance) {
+ this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+ if (instance.getAdditionalEndpoints() != null) {
+ this.additionalEndpoints = Maps.transformValues(
+ instance.getAdditionalEndpoints(),
+ new Function<Endpoint, EndpointSchema>() {
+ @Override public EndpointSchema apply(Endpoint endpoint) {
+ return new EndpointSchema(endpoint);
+ }
+ }
+ );
+ } else {
+ this.additionalEndpoints = Maps.newHashMap();
+ }
+ this.status = instance.getStatus();
+ this.shard = instance.isSetShard() ? instance.getShard() : null;
+ }
+
+ EndpointSchema getServiceEndpoint() {
+ return serviceEndpoint;
+ }
+
+ Map<String, EndpointSchema> getAdditionalEndpoints() {
+ return additionalEndpoints;
+ }
+
+ Status getStatus() {
+ return status;
+ }
+
+ Integer getShard() {
+ return shard;
+ }
+ }
+
+ /**
+ * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the
+ * __isset_bit_vector internal thrift struct field that tracks primitive types.
+ */
+ private static class AdaptedJsonCodec implements Codec<ServiceInstance> {
+ private static final Charset ENCODING = Charsets.UTF_8;
+ private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class;
+ private final Gson gson = new Gson();
+
+ @Override
+ public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
+ Writer w = new OutputStreamWriter(sink, ENCODING);
+ gson.toJson(new ServiceInstanceSchema(instance), CLASS, w);
+ w.flush();
+ }
+
+ @Override
+ public ServiceInstance deserialize(InputStream source) throws IOException {
+ ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS);
+ Endpoint primary = new Endpoint(
+ output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort());
+ Map<String, Endpoint> additional = Maps.transformValues(
+ output.getAdditionalEndpoints(),
+ new Function<EndpointSchema, Endpoint>() {
+ @Override public Endpoint apply(EndpointSchema endpoint) {
+ return new Endpoint(endpoint.getHost(), endpoint.getPort());
+ }
+ }
+ );
+ ServiceInstance instance =
+ new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus());
+ if (output.getShard() != null) {
+ instance.setShard(output.getShard());
+ }
+ return instance;
+ }
+ }
+
+ private static Codec<ServiceInstance> createCodec(final boolean useJsonEncoding) {
+ final Codec<ServiceInstance> json = new AdaptedJsonCodec();
+ final Codec<ServiceInstance> thrift =
+ ThriftCodec.create(ServiceInstance.class, ThriftCodec.BINARY_PROTOCOL);
+ final Predicate<byte[]> recognizer = new Predicate<byte[]>() {
+ public boolean apply(byte[] input) {
+ return (input.length > 1 && input[0] == '{' && input[1] == '\"') == useJsonEncoding;
+ }
+ };
+
+ if (useJsonEncoding) {
+ return CompatibilityCodec.create(json, thrift, 2, recognizer);
+ }
+ return CompatibilityCodec.create(thrift, json, 2, recognizer);
+ }
+
+ /**
+ * Creates a codec for {@link ServiceInstance} objects that uses Thrift binary encoding, and can
+ * decode both Thrift and JSON encodings.
+ *
+ * @return a new codec instance.
+ */
+ public static Codec<ServiceInstance> createThriftCodec() {
+ return createCodec(false);
+ }
+
+ /**
+ * Creates a codec for {@link ServiceInstance} objects that uses JSON encoding, and can decode
+ * both Thrift and JSON encodings.
+ *
+ * @return a new codec instance.
+ */
+ public static Codec<ServiceInstance> createJsonCodec() {
+ return createCodec(true);
+ }
+
+ /**
+ * Returns a codec for {@link ServiceInstance} objects that uses either the Thrift or the JSON
+ * encoding, depending on whether the command line argument <tt>serverset_json_encofing</tt> is
+ * set to <tt>true</tt>, and can decode both Thrift and JSON encodings.
+ *
+ * @return a new codec instance.
+ */
+ public static Codec<ServiceInstance> createDefaultCodec() {
+ return createCodec(ENCODE_JSON.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
new file mode 100644
index 0000000..370ab6b
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
@@ -0,0 +1,135 @@
+package com.twitter.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.common.base.Function;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.io.Codec;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+/**
+ * Common ServerSet related functions
+ */
+public class ServerSets {
+
+ private ServerSets() {
+ // Utility class.
+ }
+
+ /**
+ * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
+ */
+ public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
+ new Function<InetSocketAddress, Endpoint>() {
+ @Override public Endpoint apply(InetSocketAddress address) {
+ return ServerSets.toEndpoint(address);
+ }
+ };
+
+ /**
+ * Creates a server set that registers at a single path applying the given ACL to all nodes
+ * created in the path.
+ *
+ * @param zkClient ZooKeeper client to register with.
+ * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
+ * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set)
+ * @return A server set that registers at {@code zkPath}.
+ */
+ public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
+ return create(zkClient, acl, ImmutableSet.of(zkPath));
+ }
+
+ /**
+ * Creates a server set that registers at one or multiple paths applying the given ACL to all
+ * nodes created in the paths.
+ *
+ * @param zkClient ZooKeeper client to register with.
+ * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
+ * @param zkPaths Paths to register at, must be non-empty.
+ * @return A server set that registers at the given {@code zkPath}s.
+ */
+ public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, Set<String> zkPaths) {
+ Preconditions.checkNotNull(zkClient);
+ MorePreconditions.checkNotBlank(acl);
+ MorePreconditions.checkNotBlank(zkPaths);
+
+ if (zkPaths.size() == 1) {
+ return new ServerSetImpl(zkClient, acl, Iterables.getOnlyElement(zkPaths));
+ } else {
+ ImmutableList.Builder<ServerSet> builder = ImmutableList.builder();
+ for (String path : zkPaths) {
+ builder.add(new ServerSetImpl(zkClient, acl, path));
+ }
+ return new CompoundServerSet(builder.build());
+ }
+ }
+
+ /**
+ * Returns a serialized Thrift service instance object, with given endpoints and codec.
+ *
+ * @param serviceInstance the Thrift service instance object to be serialized
+ * @param codec the codec to use to serialize a Thrift service instance object
+ * @return byte array that contains a serialized Thrift service instance
+ */
+ public static byte[] serializeServiceInstance(
+ ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ codec.serialize(serviceInstance, output);
+ return output.toByteArray();
+ }
+
+ /**
+ * Serializes a service instance based on endpoints.
+ * @see #serializeServiceInstance(ServiceInstance, Codec)
+ *
+ * @param address the target address of the service instance
+ * @param additionalEndpoints additional endpoints of the service instance
+ * @param status service status
+ */
+ public static byte[] serializeServiceInstance(
+ InetSocketAddress address,
+ Map<String, Endpoint> additionalEndpoints,
+ Status status,
+ Codec<ServiceInstance> codec) throws IOException {
+
+ ServiceInstance serviceInstance =
+ new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
+ return serializeServiceInstance(serviceInstance, codec);
+ }
+
+ /**
+ * Creates a service instance object deserialized from byte array.
+ *
+ * @param data the byte array contains a serialized Thrift service instance
+ * @param codec the codec to use to deserialize the byte array
+ */
+ public static ServiceInstance deserializeServiceInstance(
+ byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+ return codec.deserialize(new ByteArrayInputStream(data));
+ }
+
+ /**
+ * Creates an endpoint for the given InetSocketAddress.
+ *
+ * @param address the target address to create the endpoint for
+ */
+ public static Endpoint toEndpoint(InetSocketAddress address) {
+ return new Endpoint(address.getHostName(), address.getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
new file mode 100644
index 0000000..00b8b53
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
@@ -0,0 +1,318 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.zookeeper.Candidate.Leader;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.ServerSet.EndpointStatus;
+import com.twitter.common.zookeeper.ServerSet.UpdateException;
+import com.twitter.thrift.Status;
+
+/**
+ * A service that uses master election to only allow a single instance of the server to join
+ * the {@link ServerSet} at a time.
+ */
+public class SingletonService {
+ private static final Logger LOG = Logger.getLogger(SingletonService.class.getName());
+
+ @VisibleForTesting
+ static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+ /**
+ * Creates a candidate that can be combined with an existing server set to form a singleton
+ * service using {@link #SingletonService(ServerSet, Candidate)}.
+ *
+ * @param zkClient The ZooKeeper client to use.
+ * @param servicePath The path where service nodes live.
+ * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+ * @return A candidate that can be housed with a standard server set under a single zk path.
+ */
+ public static Candidate createSingletonCandidate(
+ ZooKeeperClient zkClient,
+ String servicePath,
+ Iterable<ACL> acl) {
+
+ return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+ }
+
+ private final ServerSet serverSet;
+ private final Candidate candidate;
+
+ /**
+ * Equivalent to {@link #SingletonService(ZooKeeperClient, String, Iterable)} with a default
+ * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
+ */
+ public SingletonService(ZooKeeperClient zkClient, String servicePath) {
+ this(zkClient, servicePath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ /**
+ * Creates a new singleton service, identified by {@code servicePath}. All nodes related to the
+ * service (for both leader election and service registration) will live under the path and each
+ * node will be created with the supplied {@code acl}. Internally, two ZooKeeper {@code Group}s
+ * are used to manage a singleton service - one for leader election, and another for the
+ * {@code ServerSet} where the leader's endpoints are registered. Leadership election should
+ * guarantee that at most one instance will ever exist in the ServerSet at once.
+ *
+ * @param zkClient The ZooKeeper client to use.
+ * @param servicePath The path where service nodes live.
+ * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+ */
+ public SingletonService(ZooKeeperClient zkClient, String servicePath, Iterable<ACL> acl) {
+ this(
+ new ServerSetImpl(zkClient, new Group(zkClient, acl, servicePath)),
+ createSingletonCandidate(zkClient, servicePath, acl));
+ }
+
+ /**
+ * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
+ * advertises itself in the given server set once elected.
+ *
+ * @param serverSet The server set to advertise in on election.
+ * @param candidate The candidacy to use to vie for election.
+ */
+ public SingletonService(ServerSet serverSet, Candidate candidate) {
+ this.serverSet = Preconditions.checkNotNull(serverSet);
+ this.candidate = Preconditions.checkNotNull(candidate);
+ }
+
+ /**
+ * Attempts to lead the singleton service.
+ *
+ * @param endpoint The primary endpoint to register as a leader candidate in the service.
+ * @param additionalEndpoints Additional endpoints that are available on the host.
+ * @param status deprecated, will be ignored entirely
+ * @param listener Handler to call when the candidate is elected or defeated.
+ * @throws Group.WatchException If there was a problem watching the ZooKeeper group.
+ * @throws Group.JoinException If there was a problem joining the ZooKeeper group.
+ * @throws InterruptedException If the thread watching/joining the group was interrupted.
+ * @deprecated The status field is deprecated. Please use
+ * {@link #lead(InetSocketAddress, Map, LeadershipListener)}
+ */
+ @Deprecated
+ public void lead(final InetSocketAddress endpoint,
+ final Map<String, InetSocketAddress> additionalEndpoints,
+ final Status status,
+ final LeadershipListener listener)
+ throws Group.WatchException, Group.JoinException, InterruptedException {
+
+ if (status != Status.ALIVE) {
+ LOG.severe("******************************************************************************");
+ LOG.severe("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
+ LOG.severe("JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status);
+ LOG.severe("******************************************************************************");
+ } else {
+ LOG.warning("******************************************************************************");
+ LOG.warning("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
+ LOG.warning("Please use SingletonService.lead(InetSocketAddress, Map, LeadershipListener)");
+ LOG.warning("******************************************************************************");
+ }
+
+ lead(endpoint, additionalEndpoints, listener);
+ }
+
+ /**
+ * Attempts to lead the singleton service.
+ *
+ * @param endpoint The primary endpoint to register as a leader candidate in the service.
+ * @param additionalEndpoints Additional endpoints that are available on the host.
+ * @param listener Handler to call when the candidate is elected or defeated.
+ * @throws Group.WatchException If there was a problem watching the ZooKeeper group.
+ * @throws Group.JoinException If there was a problem joining the ZooKeeper group.
+ * @throws InterruptedException If the thread watching/joining the group was interrupted.
+ */
+ public void lead(final InetSocketAddress endpoint,
+ final Map<String, InetSocketAddress> additionalEndpoints,
+ final LeadershipListener listener)
+ throws Group.WatchException, Group.JoinException, InterruptedException {
+
+ Preconditions.checkNotNull(listener);
+
+ candidate.offerLeadership(new Leader() {
+ private EndpointStatus endpointStatus = null;
+ @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
+ listener.onLeading(new LeaderControl() {
+ EndpointStatus endpointStatus = null;
+ final AtomicBoolean left = new AtomicBoolean(false);
+
+ // Methods are synchronized to prevent simultaneous invocations.
+ @Override public synchronized void advertise()
+ throws JoinException, InterruptedException {
+
+ Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
+ Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
+ endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+ }
+
+ @Override public synchronized void leave() throws UpdateException, JoinException {
+ Preconditions.checkState(left.compareAndSet(false, true),
+ "Cannot leave more than once.");
+ if (endpointStatus != null) {
+ endpointStatus.leave();
+ }
+ abdicate.execute();
+ }
+ });
+ }
+
+ @Override public void onDefeated() {
+ listener.onDefeated(endpointStatus);
+ }
+ });
+ }
+
+ /**
+ * A listener to be notified of changes in the leadership status.
+ * Implementers should be careful to avoid blocking operations in these callbacks.
+ */
+ public interface LeadershipListener {
+
+ /**
+ * Notifies the listener that is is current leader.
+ *
+ * @param control A controller handle to advertise and/or leave advertised presence.
+ */
+ public void onLeading(LeaderControl control);
+
+ /**
+ * Notifies the listener that it is no longer leader. The leader should take this opportunity
+ * to remove its advertisement gracefully.
+ *
+ * @param status A handle on the endpoint status for the advertised leader.
+ */
+ public void onDefeated(@Nullable EndpointStatus status);
+ }
+
+ /**
+ * A leadership listener that decorates another listener by automatically defeating a
+ * leader that has dropped its connection to ZooKeeper.
+ * Note that the decision to use this over session-based mutual exclusion should not be taken
+ * lightly. Any momentary connection loss due to a flaky network or a ZooKeeper server process
+ * exit will cause a leader to abort.
+ */
+ public static class DefeatOnDisconnectLeader implements LeadershipListener {
+
+ private final LeadershipListener wrapped;
+ private Optional<LeaderControl> maybeControl = Optional.absent();
+
+ /**
+ * Creates a new leadership listener that will delegate calls to the wrapped listener, and
+ * invoke {@link #onDefeated(EndpointStatus)} if a ZooKeeper disconnect is observed while
+ * leading.
+ *
+ * @param zkClient The ZooKeeper client to watch for disconnect events.
+ * @param wrapped The leadership listener to wrap.
+ */
+ public DefeatOnDisconnectLeader(ZooKeeperClient zkClient, LeadershipListener wrapped) {
+ this.wrapped = Preconditions.checkNotNull(wrapped);
+
+ zkClient.register(new Watcher() {
+ @Override public void process(WatchedEvent event) {
+ if ((event.getType() == EventType.None)
+ && (event.getState() == KeeperState.Disconnected)) {
+ disconnected();
+ }
+ }
+ });
+ }
+
+ private synchronized void disconnected() {
+ if (maybeControl.isPresent()) {
+ LOG.warning("Disconnected from ZooKeeper while leading, committing suicide.");
+ try {
+ wrapped.onDefeated(null);
+ maybeControl.get().leave();
+ } catch (UpdateException e) {
+ LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
+ } catch (JoinException e) {
+ LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
+ } finally {
+ setControl(null);
+ }
+ } else {
+ LOG.info("Disconnected from ZooKeeper, but that's fine because I'm not the leader.");
+ }
+ }
+
+ private synchronized void setControl(@Nullable LeaderControl control) {
+ this.maybeControl = Optional.fromNullable(control);
+ }
+
+ @Override public void onLeading(final LeaderControl control) {
+ setControl(control);
+ wrapped.onLeading(new LeaderControl() {
+ @Override public void advertise() throws JoinException, InterruptedException {
+ control.advertise();
+ }
+
+ @Override public void leave() throws UpdateException, JoinException {
+ setControl(null);
+ control.leave();
+ }
+ });
+ }
+
+ @Override public void onDefeated(@Nullable EndpointStatus status) {
+ setControl(null);
+ wrapped.onDefeated(status);
+ }
+ }
+
+ /**
+ * A controller for the state of the leader. This will be provided to the leader upon election,
+ * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and
+ * terminate leadership at will.
+ */
+ public interface LeaderControl {
+
+ /**
+ * Advertises the leader's server presence to clients.
+ *
+ * @throws JoinException If there was an error advertising.
+ * @throws InterruptedException If interrupted while advertising.
+ */
+ void advertise() throws JoinException, InterruptedException;
+
+ /**
+ * Leaves candidacy for leadership, removing advertised server presence if applicable.
+ *
+ * @throws UpdateException If the leader's status could not be updated.
+ * @throws JoinException If there was an error abdicating from leader election.
+ */
+ void leave() throws UpdateException, JoinException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
new file mode 100644
index 0000000..c10bd86
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
@@ -0,0 +1,132 @@
+package com.twitter.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Commands;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+/**
+ * A server set that represents a fixed set of hosts.
+ * This may be composed under {@link CompoundServerSet} to ensure a minimum set of hosts is
+ * present.
+ * A static server set does not support joining, but will allow normal join calls and status update
+ * calls to be made.
+ */
+public class StaticServerSet implements ServerSet {
+
+ private static final Logger LOG = Logger.getLogger(StaticServerSet.class.getName());
+
+ private static final Function<Endpoint, ServiceInstance> ENDPOINT_TO_INSTANCE =
+ new Function<Endpoint, ServiceInstance>() {
+ @Override public ServiceInstance apply(Endpoint endpoint) {
+ return new ServiceInstance(endpoint, ImmutableMap.<String, Endpoint>of(), Status.ALIVE);
+ }
+ };
+
+ private final ImmutableSet<ServiceInstance> hosts;
+
+ /**
+ * Creates a static server set that will reply to monitor calls immediately and exactly once with
+ * the provided service instances.
+ *
+ * @param hosts Hosts in the static set.
+ */
+ public StaticServerSet(Set<ServiceInstance> hosts) {
+ this.hosts = ImmutableSet.copyOf(hosts);
+ }
+
+ /**
+ * Creates a static server set containing the provided endpoints (and no auxiliary ports) which
+ * will all be in the {@link Status#ALIVE} state.
+ *
+ * @param endpoints Endpoints in the static set.
+ * @return A static server set that will advertise the provided endpoints.
+ */
+ public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) {
+ return new StaticServerSet(
+ ImmutableSet.copyOf(Iterables.transform(endpoints, ENDPOINT_TO_INSTANCE)));
+ }
+
+ private EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> auxEndpoints,
+ Optional<Integer> shardId) {
+
+ LOG.warning("Attempt to join fixed server set ignored.");
+ ServiceInstance joining = new ServiceInstance(
+ ServerSets.toEndpoint(endpoint),
+ Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT),
+ Status.ALIVE);
+ if (shardId.isPresent()) {
+ joining.setShard(shardId.get());
+ }
+ if (!hosts.contains(joining)) {
+ LOG.log(Level.SEVERE,
+ "Joining instance " + joining + " does not match any member of the static set.");
+ }
+
+ return new EndpointStatus() {
+ @Override public void leave() throws UpdateException {
+ LOG.warning("Attempt to adjust state of fixed server set ignored.");
+ }
+
+ @Override public void update(Status status) throws UpdateException {
+ LOG.warning("Attempt to adjust state of fixed server set ignored.");
+ }
+ };
+ }
+
+ @Override
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> auxEndpoints,
+ Status status) {
+
+ LOG.warning("This method is deprecated. Please do not specify a status field.");
+ return join(endpoint, auxEndpoints, Optional.<Integer>absent());
+ }
+
+ @Override
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> auxEndpoints) {
+
+ LOG.warning("Joining a ServerSet without a shard ID is deprecated and will soon break.");
+ return join(endpoint, auxEndpoints, Optional.<Integer>absent());
+ }
+
+ @Override
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> auxEndpoints,
+ int shardId) throws JoinException, InterruptedException {
+
+ return join(endpoint, auxEndpoints, Optional.of(shardId));
+ }
+
+ @Override
+ public Command watch(HostChangeMonitor<ServiceInstance> monitor) {
+ monitor.onChange(hosts);
+ return Commands.NOOP;
+ }
+
+ @Override
+ public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+ watch(monitor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java
new file mode 100644
index 0000000..a051611
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java
@@ -0,0 +1,496 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.net.InetSocketAddressHelper;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * Manages a connection to a ZooKeeper cluster.
+ */
+public class ZooKeeperClient {
+
+ /**
+ * Indicates an error connecting to a zookeeper cluster.
+ */
+ public class ZooKeeperConnectionException extends Exception {
+ public ZooKeeperConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Encapsulates a user's credentials and has the ability to authenticate them through a
+ * {@link ZooKeeper} client.
+ */
+ public interface Credentials {
+
+ /**
+ * A set of {@code Credentials} that performs no authentication.
+ */
+ Credentials NONE = new Credentials() {
+ @Override public void authenticate(ZooKeeper zooKeeper) {
+ // noop
+ }
+
+ @Override public String scheme() {
+ return null;
+ }
+
+ @Override public byte[] authToken() {
+ return null;
+ }
+ };
+
+ /**
+ * Authenticates these credentials against the given {@code ZooKeeper} client.
+ *
+ * @param zooKeeper the client to authenticate
+ */
+ void authenticate(ZooKeeper zooKeeper);
+
+ /**
+ * Returns the authentication scheme these credentials are for.
+ *
+ * @return the scheme these credentials are for or {@code null} if no authentication is
+ * intended.
+ */
+ @Nullable
+ String scheme();
+
+ /**
+ * Returns the authentication token.
+ *
+ * @return the authentication token or {@code null} if no authentication is intended.
+ */
+ @Nullable
+ byte[] authToken();
+ }
+
+ /**
+ * Creates a set of credentials for the zoo keeper digest authentication mechanism.
+ *
+ * @param username the username to authenticate with
+ * @param password the password to authenticate with
+ * @return a set of credentials that can be used to authenticate the zoo keeper client
+ */
+ public static Credentials digestCredentials(String username, String password) {
+ MorePreconditions.checkNotBlank(username);
+ Preconditions.checkNotNull(password);
+
+ // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
+ // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
+ // Consider writing and installing a version of DigestAuthenticationProvider that controls its
+ // Charset explicitly.
+ return credentials("digest", (username + ":" + password).getBytes());
+ }
+
+ /**
+ * Creates a set of credentials for the given authentication {@code scheme}.
+ *
+ * @param scheme the scheme to authenticate with
+ * @param authToken the authentication token
+ * @return a set of credentials that can be used to authenticate the zoo keeper client
+ */
+ public static Credentials credentials(final String scheme, final byte[] authToken) {
+ MorePreconditions.checkNotBlank(scheme);
+ Preconditions.checkNotNull(authToken);
+
+ return new Credentials() {
+ @Override public void authenticate(ZooKeeper zooKeeper) {
+ zooKeeper.addAuthInfo(scheme, authToken);
+ }
+
+ @Override public String scheme() {
+ return scheme;
+ }
+
+ @Override public byte[] authToken() {
+ return authToken;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (!(o instanceof Credentials)) {
+ return false;
+ }
+
+ Credentials other = (Credentials) o;
+ return new EqualsBuilder()
+ .append(scheme, other.scheme())
+ .append(authToken, other.authToken())
+ .isEquals();
+ }
+
+ @Override public int hashCode() {
+ return Objects.hashCode(scheme, authToken);
+ }
+ };
+ }
+
+ private final class SessionState {
+ private final long sessionId;
+ private final byte[] sessionPasswd;
+
+ private SessionState(long sessionId, byte[] sessionPasswd) {
+ this.sessionId = sessionId;
+ this.sessionPasswd = sessionPasswd;
+ }
+ }
+
+ private static final Logger LOG = Logger.getLogger(ZooKeeperClient.class.getName());
+
+ private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS);
+
+ private final int sessionTimeoutMs;
+ private final Credentials credentials;
+ private final String zooKeeperServers;
+ // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
+ // made from within long synchronized blocks.
+ private volatile ZooKeeper zooKeeper;
+ private SessionState sessionState;
+
+ private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
+ private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>();
+
+ private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
+ InetSocketAddress... addresses) {
+ return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get()}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param zooKeeperServer the first, required ZK server
+ * @param zooKeeperServers any additional servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer,
+ InetSocketAddress... zooKeeperServers) {
+ this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
+ Iterable<InetSocketAddress> zooKeeperServers) {
+ this(sessionTimeout, Credentials.NONE, Optional.<String> absent(), zooKeeperServers);
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get()}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param zooKeeperServer the first, required ZK server
+ * @param zooKeeperServers any additional servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+ InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) {
+ this(sessionTimeout, credentials, Optional.<String> absent(), combine(zooKeeperServer, zooKeeperServers));
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+ Iterable<InetSocketAddress> zooKeeperServers) {
+ this(sessionTimeout, credentials, Optional.<String> absent(), zooKeeperServers);
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param chrootPath an optional chroot path
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+ Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) {
+ this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
+ this.credentials = Preconditions.checkNotNull(credentials);
+
+ if (chrootPath.isPresent()) {
+ PathUtils.validatePath(chrootPath.get());
+ }
+
+ Preconditions.checkNotNull(zooKeeperServers);
+ Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
+ "Must present at least 1 ZK server");
+
+ Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ WatchedEvent event = eventQueue.take();
+ for (Watcher watcher : watchers) {
+ watcher.process(event);
+ }
+ } catch (InterruptedException e) { /* ignore */ }
+ }
+ }
+ };
+ watcherProcessor.setDaemon(true);
+ watcherProcessor.start();
+
+ Iterable<String> servers =
+ Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
+ InetSocketAddressHelper.INET_TO_STR);
+ this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or(""));
+ }
+
+ /**
+ * Returns true if this client has non-empty credentials set. For example, returns {@code false}
+ * if this client was constructed with {@link Credentials#NONE}.
+ *
+ * @return {@code true} if this client is configured with non-empty credentials.
+ */
+ public boolean hasCredentials() {
+ return !Strings.isNullOrEmpty(credentials.scheme()) && (credentials.authToken() != null);
+ }
+
+ /**
+ * Returns the current active ZK connection or establishes a new one if none has yet been
+ * established or a previous connection was disconnected or had its session time out. This method
+ * will attempt to re-use sessions when possible. Equivalent to:
+ * <pre>get(Amount.of(0L, ...)</pre>.
+ *
+ * @return a connected ZooKeeper client
+ * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+ * @throws InterruptedException if interrupted while waiting for a connection to be established
+ */
+ public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException {
+ try {
+ return get(WAIT_FOREVER);
+ } catch (TimeoutException e) {
+ InterruptedException interruptedException =
+ new InterruptedException("Got an unexpected TimeoutException for 0 wait");
+ interruptedException.initCause(e);
+ throw interruptedException;
+ }
+ }
+
+ /**
+ * Returns the current active ZK connection or establishes a new one if none has yet been
+ * established or a previous connection was disconnected or had its session time out. This
+ * method will attempt to re-use sessions when possible.
+ *
+ * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK
+ * cluster to be established; 0 to wait forever
+ * @return a connected ZooKeeper client
+ * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+ * @throws InterruptedException if interrupted while waiting for a connection to be established
+ * @throws TimeoutException if a connection could not be established within the configured
+ * session timeout
+ */
+ public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
+ throws ZooKeeperConnectionException, InterruptedException, TimeoutException {
+
+ if (zooKeeper == null) {
+ final CountDownLatch connected = new CountDownLatch(1);
+ Watcher watcher = new Watcher() {
+ @Override public void process(WatchedEvent event) {
+ switch (event.getType()) {
+ // Guard the None type since this watch may be used as the default watch on calls by
+ // the client outside our control.
+ case None:
+ switch (event.getState()) {
+ case Expired:
+ LOG.info("Zookeeper session expired. Event: " + event);
+ close();
+ break;
+ case SyncConnected:
+ connected.countDown();
+ break;
+ }
+ }
+
+ eventQueue.offer(event);
+ }
+ };
+
+ try {
+ zooKeeper = (sessionState != null)
+ ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId,
+ sessionState.sessionPasswd)
+ : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
+ } catch (IOException e) {
+ throw new ZooKeeperConnectionException(
+ "Problem connecting to servers: " + zooKeeperServers, e);
+ }
+
+ if (connectionTimeout.getValue() > 0) {
+ if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+ close();
+ throw new TimeoutException("Timed out waiting for a ZK connection after "
+ + connectionTimeout);
+ }
+ } else {
+ try {
+ connected.await();
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting to connect to zooKeeper");
+ close();
+ throw ex;
+ }
+ }
+ credentials.authenticate(zooKeeper);
+
+ sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
+ }
+ return zooKeeper;
+ }
+
+ /**
+ * Clients that need to re-establish state after session expiration can register an
+ * {@code onExpired} command to execute.
+ *
+ * @param onExpired the {@code Command} to register
+ * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
+ * removal.
+ */
+ public Watcher registerExpirationHandler(final Command onExpired) {
+ Watcher watcher = new Watcher() {
+ @Override public void process(WatchedEvent event) {
+ if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+ onExpired.execute();
+ }
+ }
+ };
+ register(watcher);
+ return watcher;
+ }
+
+ /**
+ * Clients that need to register a top-level {@code Watcher} should do so using this method. The
+ * registered {@code watcher} will remain registered across re-connects and session expiration
+ * events.
+ *
+ * @param watcher the {@code Watcher to register}
+ */
+ public void register(Watcher watcher) {
+ watchers.add(watcher);
+ }
+
+ /**
+ * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
+ * registered.
+ *
+ * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
+ * @return whether the given {@code Watcher} was found and removed from the active set
+ */
+ public boolean unregister(Watcher watcher) {
+ return watchers.remove(watcher);
+ }
+
+ /**
+ * Checks to see if the client might reasonably re-try an operation given the exception thrown
+ * while attempting it. If the ZooKeeper session should be expired to enable the re-try to
+ * succeed this method will expire it as a side-effect.
+ *
+ * @param e the exception to test
+ * @return true if a retry can be attempted
+ */
+ public boolean shouldRetry(KeeperException e) {
+ if (e instanceof SessionExpiredException) {
+ close();
+ }
+ return ZooKeeperUtils.isRetryable(e);
+ }
+
+ /**
+ * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent
+ * calls to this method will no-op until the next successful {@link #get}.
+ */
+ public synchronized void close() {
+ if (zooKeeper != null) {
+ try {
+ zooKeeper.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warning("Interrupted trying to close zooKeeper");
+ } finally {
+ zooKeeper = null;
+ sessionState = null;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ synchronized boolean isClosed() {
+ return zooKeeper == null;
+ }
+
+ @VisibleForTesting
+ ZooKeeper getZooKeeperClientForTests() {
+ return zooKeeper;
+ }
+}