You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/26 23:00:23 UTC
[33/51] [partial] aurora git commit: Move packages from
com.twitter.common to org.apache.aurora.common
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Group.java b/commons/src/main/java/com/twitter/common/zookeeper/Group.java
deleted file mode 100644
index 7a32cd0..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/Group.java
+++ /dev/null
@@ -1,708 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Commands;
-import com.twitter.common.base.ExceptionalSupplier;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.util.BackoffHelper;
-import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-
-/**
- * This class exposes methods for joining and monitoring distributed groups. The groups this class
- * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
- * each member of a group.
- */
-public class Group {
- private static final Logger LOG = Logger.getLogger(Group.class.getName());
-
- private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
- private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
-
- private final ZooKeeperClient zkClient;
- private final ImmutableList<ACL> acl;
- private final String path;
-
- private final NodeScheme nodeScheme;
- private final Predicate<String> nodeNameFilter;
-
- private final BackoffHelper backoffHelper;
-
- /**
- * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or
- * duplicate slashes will be normalized. For example, all the following paths would create a
- * group at the normalized path /my/distributed/group:
- * <ul>
- * <li>/my/distributed/group
- * <li>/my/distributed/group/
- * <li>/my/distributed//group
- * </ul>
- *
- * @param zkClient the client to use for interactions with ZooKeeper
- * @param acl the ACL to use for creating the persistent group path if it does not already exist
- * @param path the absolute persistent path that represents this group
- * @param nodeScheme the scheme that defines how nodes are created
- */
- public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
- this.zkClient = Preconditions.checkNotNull(zkClient);
- this.acl = ImmutableList.copyOf(acl);
- this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
-
- this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
- nodeNameFilter = new Predicate<String>() {
- @Override public boolean apply(String nodeName) {
- return Group.this.nodeScheme.isMember(nodeName);
- }
- };
-
- backoffHelper = new BackoffHelper();
- }
-
- /**
- * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
- * {@code namePrefix} of 'member_'.
- */
- public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
- this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
- }
-
- /**
- * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
- * {@link DefaultScheme} using {@code namePrefix}.
- */
- public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
- this(zkClient, acl, path, new DefaultScheme(namePrefix));
- }
-
- public String getMemberPath(String memberId) {
- return path + "/" + MorePreconditions.checkNotBlank(memberId);
- }
-
- public String getPath() {
- return path;
- }
-
- public String getMemberId(String nodePath) {
- MorePreconditions.checkNotBlank(nodePath);
- Preconditions.checkArgument(nodePath.startsWith(path + "/"),
- "Not a member of this group[%s]: %s", path, nodePath);
-
- String memberId = StringUtils.substringAfterLast(nodePath, "/");
- Preconditions.checkArgument(nodeScheme.isMember(memberId),
- "Not a group member: %s", memberId);
- return memberId;
- }
-
- /**
- * Returns the current list of group member ids by querying ZooKeeper synchronously.
- *
- * @return the ids of all the present members of this group
- * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
- * @throws KeeperException if there was a problem reading this group's member ids
- * @throws InterruptedException if this thread is interrupted listing the group members
- */
- public Iterable<String> getMemberIds()
- throws ZooKeeperConnectionException, KeeperException, InterruptedException {
- return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
- }
-
- /**
- * Gets the data for one of this groups members by querying ZooKeeper synchronously.
- *
- * @param memberId the id of the member whose data to retrieve
- * @return the data associated with the {@code memberId}
- * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
- * @throws KeeperException if there was a problem reading this member's data
- * @throws InterruptedException if this thread is interrupted retrieving the member data
- */
- public byte[] getMemberData(String memberId)
- throws ZooKeeperConnectionException, KeeperException, InterruptedException {
- return zkClient.get().getData(getMemberPath(memberId), false, null);
- }
-
- /**
- * Represents membership in a distributed group.
- */
- public interface Membership {
-
- /**
- * Returns the persistent ZooKeeper path that represents this group.
- */
- String getGroupPath();
-
- /**
- * Returns the id (ZooKeeper node name) of this group member. May change over time if the
- * ZooKeeper session expires.
- */
- String getMemberId();
-
- /**
- * Returns the full ZooKeeper path to this group member. May change over time if the
- * ZooKeeper session expires.
- */
- String getMemberPath();
-
- /**
- * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
- * {@link Group#join()}.
- *
- * @return the new membership data
- * @throws UpdateException if there was a problem updating the membership data
- */
- byte[] updateMemberData() throws UpdateException;
-
- /**
- * Cancels group membership by deleting the associated ZooKeeper member node.
- *
- * @throws JoinException if there is a problem deleting the node
- */
- void cancel() throws JoinException;
- }
-
- /**
- * Indicates an error joining a group.
- */
- public static class JoinException extends Exception {
- public JoinException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Indicates an error updating a group member's data.
- */
- public static class UpdateException extends Exception {
- public UpdateException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Equivalent to calling {@code join(null, null)}.
- */
- public final Membership join() throws JoinException, InterruptedException {
- return join(NO_MEMBER_DATA, null);
- }
-
- /**
- * Equivalent to calling {@code join(memberData, null)}.
- */
- public final Membership join(Supplier<byte[]> memberData)
- throws JoinException, InterruptedException {
-
- return join(memberData, null);
- }
-
- /**
- * Equivalent to calling {@code join(null, onLoseMembership)}.
- */
- public final Membership join(@Nullable final Command onLoseMembership)
- throws JoinException, InterruptedException {
-
- return join(NO_MEMBER_DATA, onLoseMembership);
- }
-
- /**
- * Joins this group and returns the resulting Membership when successful. Membership will be
- * automatically cancelled when the current jvm process dies; however the returned Membership
- * object can be used to cancel membership earlier. Unless
- * {@link com.twitter.common.zookeeper.Group.Membership#cancel()} is called the membership will
- * be maintained by re-establishing it silently in the background.
- *
- * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an
- * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
- * membership in the group.
- *
- * @param memberData a supplier of the data to store in the member node
- * @param onLoseMembership a callback to notify when membership is lost
- * @return a Membership object with the member details
- * @throws JoinException if there was a problem joining the group
- * @throws InterruptedException if this thread is interrupted awaiting completion of the join
- */
- public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
- throws JoinException, InterruptedException {
-
- Preconditions.checkNotNull(memberData);
- ensurePersistentGroupPath();
-
- final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
- return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, JoinException>() {
- @Override public Membership get() throws JoinException {
- try {
- return groupJoiner.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new JoinException("Interrupted trying to join group at path: " + path, e);
- } catch (ZooKeeperConnectionException e) {
- LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e);
- return null;
- } catch (KeeperException e) {
- if (zkClient.shouldRetry(e)) {
- LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e);
- return null;
- } else {
- throw new JoinException("Problem joining partition group at path: " + path, e);
- }
- }
- }
- });
- }
-
- private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
- backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() {
- @Override public Boolean get() throws JoinException {
- try {
- ZooKeeperUtils.ensurePath(zkClient, acl, path);
- return true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
- } catch (ZooKeeperConnectionException e) {
- LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
- return false;
- } catch (KeeperException e) {
- if (zkClient.shouldRetry(e)) {
- LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, e);
- return false;
- } else {
- throw new JoinException("Problem ensuring group at path: " + path, e);
- }
- }
- }
- });
- }
-
- private class ActiveMembership implements Membership {
- private final Supplier<byte[]> memberData;
- private final Command onLoseMembership;
- private String nodePath;
- private String memberId;
- private volatile boolean cancelled;
- private byte[] membershipData;
-
- public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
- this.memberData = memberData;
- this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
- }
-
- @Override
- public String getGroupPath() {
- return path;
- }
-
- @Override
- public synchronized String getMemberId() {
- return memberId;
- }
-
- @Override
- public synchronized String getMemberPath() {
- return nodePath;
- }
-
- @Override
- public synchronized byte[] updateMemberData() throws UpdateException {
- byte[] membershipData = memberData.get();
- if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
- try {
- zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
- this.membershipData = membershipData;
- } catch (KeeperException e) {
- throw new UpdateException("Problem updating membership data.", e);
- } catch (InterruptedException e) {
- throw new UpdateException("Interrupted attempting to update membership data.", e);
- } catch (ZooKeeperConnectionException e) {
- throw new UpdateException(
- "Could not connect to the ZooKeeper cluster to update membership data.", e);
- }
- }
- return membershipData;
- }
-
- @Override
- public synchronized void cancel() throws JoinException {
- if (!cancelled) {
- try {
- backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() {
- @Override public Boolean get() throws JoinException {
- try {
- zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
- return true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
- } catch (ZooKeeperConnectionException e) {
- LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
- return false;
- } catch (NoNodeException e) {
- LOG.info("Membership already cancelled, node at path: " + nodePath +
- " has been deleted");
- return true;
- } catch (KeeperException e) {
- if (zkClient.shouldRetry(e)) {
- LOG.log(Level.WARNING, "Temporary error cancelling membership: " + nodePath, e);
- return false;
- } else {
- throw new JoinException("Problem cancelling membership: " + nodePath, e);
- }
- }
- }
- });
- cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new JoinException("Problem cancelling membership: " + nodePath, e);
- }
- }
- }
-
- private class CancelledException extends IllegalStateException { /* marker */ }
-
- synchronized Membership join()
- throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-
- if (cancelled) {
- throw new CancelledException();
- }
-
- if (nodePath == null) {
- // Re-join if our ephemeral node goes away due to session expiry - only needs to be
- // registered once.
- zkClient.registerExpirationHandler(new Command() {
- @Override public void execute() {
- tryJoin();
- }
- });
- }
-
- byte[] membershipData = memberData.get();
- String nodeName = nodeScheme.createName(membershipData);
- CreateMode createMode = nodeScheme.isSequential()
- ? CreateMode.EPHEMERAL_SEQUENTIAL
- : CreateMode.EPHEMERAL;
- nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
- memberId = Group.this.getMemberId(nodePath);
- LOG.info("Set group member ID to " + memberId);
- this.membershipData = membershipData;
-
- // Re-join if our ephemeral node goes away due to maliciousness.
- zkClient.get().exists(nodePath, new Watcher() {
- @Override public void process(WatchedEvent event) {
- if (event.getType() == EventType.NodeDeleted) {
- tryJoin();
- }
- }
- });
-
- return this;
- }
-
- private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
- new ExceptionalSupplier<Boolean, InterruptedException>() {
- @Override public Boolean get() throws InterruptedException {
- try {
- join();
- return true;
- } catch (CancelledException e) {
- // Lost a cancel race - that's ok.
- return true;
- } catch (ZooKeeperConnectionException e) {
- LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
- return false;
- } catch (KeeperException e) {
- if (zkClient.shouldRetry(e)) {
- LOG.log(Level.WARNING, "Temporary error re-joining group: " + path, e);
- return false;
- } else {
- throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
- }
- }
- }
- };
-
- private synchronized void tryJoin() {
- onLoseMembership.execute();
- try {
- backoffHelper.doUntilSuccess(tryJoin);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(
- String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
- }
- }
- }
-
- /**
- * An interface to an object that listens for changes to a group's membership.
- */
- public interface GroupChangeListener {
-
- /**
- * Called whenever group membership changes with the new list of member ids.
- *
- * @param memberIds the current member ids
- */
- void onGroupChange(Iterable<String> memberIds);
- }
-
- /**
- * An interface that dictates the scheme to use for storing and filtering nodes that represent
- * members of a distributed group.
- */
- public interface NodeScheme {
- /**
- * Determines if a child node is a member of a group by examining the node's name.
- *
- * @param nodeName the name of a child node found in a group
- * @return {@code true} if {@code nodeName} identifies a group member in this scheme
- */
- boolean isMember(String nodeName);
-
- /**
- * Generates a node name for the node representing this process in the distributed group.
- *
- * @param membershipData the data that will be stored in this node
- * @return the name for the node that will represent this process in the group
- */
- String createName(byte[] membershipData);
-
- /**
- * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
- *
- * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
- */
- boolean isSequential();
- }
-
- /**
- * Indicates an error watching a group.
- */
- public static class WatchException extends Exception {
- public WatchException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Watches this group for the lifetime of this jvm process. This method will block until the
- * current group members are available, notify the {@code groupChangeListener} and then return.
- * All further changes to the group membership will cause notifications on a background thread.
- *
- * @param groupChangeListener the listener to notify of group membership change events
- * @return A command which, when executed, will stop watching the group.
- * @throws WatchException if there is a problem generating the 1st group membership list
- * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
- */
- public final Command watch(final GroupChangeListener groupChangeListener)
- throws WatchException, InterruptedException {
- Preconditions.checkNotNull(groupChangeListener);
-
- try {
- ensurePersistentGroupPath();
- } catch (JoinException e) {
- throw new WatchException("Failed to create group path: " + path, e);
- }
-
- final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
- backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, WatchException>() {
- @Override public Boolean get() throws WatchException {
- try {
- groupMonitor.watchGroup();
- return true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new WatchException("Interrupted trying to watch group at path: " + path, e);
- } catch (ZooKeeperConnectionException e) {
- LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e);
- return null;
- } catch (KeeperException e) {
- if (zkClient.shouldRetry(e)) {
- LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e);
- return null;
- } else {
- throw new WatchException("Problem trying to watch group at path: " + path, e);
- }
- }
- }
- });
- return new Command() {
- @Override public void execute() {
- groupMonitor.stopWatching();
- }
- };
- }
-
- /**
- * Helps continuously monitor a group for membership changes.
- */
- private class GroupMonitor {
- private final GroupChangeListener groupChangeListener;
- private volatile boolean stopped = false;
- private Set<String> members;
-
- GroupMonitor(GroupChangeListener groupChangeListener) {
- this.groupChangeListener = groupChangeListener;
- }
-
- private final Watcher groupWatcher = new Watcher() {
- @Override public final void process(WatchedEvent event) {
- if (event.getType() == EventType.NodeChildrenChanged) {
- tryWatchGroup();
- }
- }
- };
-
- private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
- new ExceptionalSupplier<Boolean, InterruptedException>() {
- @Override public Boolean get() throws InterruptedException {
- try {
- watchGroup();
- return true;
- } catch (ZooKeeperConnectionException e) {
- LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
- return false;
- } catch (KeeperException e) {
- if (zkClient.shouldRetry(e)) {
- LOG.log(Level.WARNING, "Temporary error re-watching group: " + path, e);
- return false;
- } else {
- throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
- }
- }
- }
- };
-
- private void tryWatchGroup() {
- if (stopped) {
- return;
- }
-
- try {
- backoffHelper.doUntilSuccess(tryWatchGroup);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(
- String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
- }
- }
-
- private void watchGroup()
- throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-
- if (stopped) {
- return;
- }
-
- List<String> children = zkClient.get().getChildren(path, groupWatcher);
- setMembers(Iterables.filter(children, nodeNameFilter));
- }
-
- private void stopWatching() {
- // TODO(William Farner): Cancel the watch when
- // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
- LOG.info("Stopping watch on " + this);
- stopped = true;
- }
-
- synchronized void setMembers(Iterable<String> members) {
- if (stopped) {
- LOG.info("Suppressing membership update, no longer watching " + this);
- return;
- }
-
- if (this.members == null) {
- // Reset our watch on the group if session expires - only needs to be registered once.
- zkClient.registerExpirationHandler(new Command() {
- @Override public void execute() {
- tryWatchGroup();
- }
- });
- }
-
- Set<String> membership = ImmutableSet.copyOf(members);
- if (!membership.equals(this.members)) {
- groupChangeListener.onGroupChange(members);
- this.members = membership;
- }
- }
- }
-
- /**
- * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
- * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
- * prefix is "member_", the node's full path will look something like
- * {@code /discovery/servicename/member_0000000007}.
- */
- public static class DefaultScheme implements NodeScheme {
- private final String namePrefix;
- private final Pattern namePattern;
-
- /**
- * Creates a sequential node scheme based on the given node name prefix.
- *
- * @param namePrefix the prefix for the names of the member nodes
- */
- public DefaultScheme(String namePrefix) {
- this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
- namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
- }
-
- @Override
- public boolean isMember(String nodeName) {
- return namePattern.matcher(nodeName).matches();
- }
-
- @Override
- public String createName(byte[] membershipData) {
- return namePrefix;
- }
-
- @Override
- public boolean isSequential() {
- return true;
- }
- }
-
- @Override
- public String toString() {
- return "Group " + path;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
deleted file mode 100644
index 4cdf5f2..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Ordering;
-import com.twitter.common.zookeeper.Group.GroupChangeListener;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.Group.Membership;
-import com.twitter.common.zookeeper.Group.UpdateException;
-import com.twitter.common.zookeeper.Group.WatchException;
-import org.apache.zookeeper.data.ACL;
-
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.logging.Logger;
-
-/**
- * A distributed mechanism for eventually arriving at an evenly partitioned space of long values.
- * A typical usage would have a client on each of several hosts joining a logical partition (a
- * "partition group") that represents some shared work. Clients could then process a subset of a
- * full body of work by testing any given item of work with their partition filter.
- *
- * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1
- * partition as explained in {@link #join()}.
- *
- * @author John Sirois
- */
-public class Partitioner {
-
- private static final Logger LOG = Logger.getLogger(Partitioner.class.getName());
-
- private volatile int groupSize;
- private volatile int groupIndex;
- private final Group group;
-
- /**
- * Constructs a representation of a partition group but does not join it. Note that the partition
- * group path will be created as a persistent zookeeper path if it does not already exist.
- *
- * @param zkClient a client to use for joining the partition group and watching its membership
- * @param acl the acl for this partition group
- * @param path a zookeeper path that represents the partition group
- */
- public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) {
- group = new Group(zkClient, acl, path);
- }
-
- @VisibleForTesting
- int getGroupSize() {
- return groupSize;
- }
-
- /**
- * Represents a slice of a partition group. The partition is dynamic and will adjust its size as
- * members join and leave its partition group.
- */
- public abstract static class Partition implements Predicate<Long>, Membership {
-
- /**
- * Returns {@code true} if the given {@code value} is a member of this partition at this time.
- */
- public abstract boolean isMember(long value);
-
- /**
- * Gets number of members in the group at this time.
- *
- * @return number of members in the ZK group at this time.
- */
- public abstract int getNumPartitions();
-
- /**
- * Evaluates partition membership based on the given {@code value}'s hash code. If the value
- * is null it is never a member of a partition.
- */
- boolean isMember(Object value) {
- return (value != null) && isMember(value.hashCode());
- }
-
- /**
- * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing
- * overhead.
- */
- @Override
- public boolean apply(@Nullable Long input) {
- return (input != null) && isMember(input);
- }
- }
-
- /**
- * Attempts to join the partition group and claim a slice. When successful, a predicate is
- * returned that can be used to test whether or not an item belongs to this partition. The
- * predicate is dynamic such that as the group is further partitioned or partitions merge the
- * predicate will claim a narrower or wider swath of the partition space respectively. Partition
- * creation and merging is not instantaneous and clients should expect independent partitions to
- * claim ownership of some items when partition membership is in flux. It is only in the steady
- * state that a client should expect independent partitions to divide the partition space evenly
- * and without overlap.
- *
- * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation.
- *
- * @return the partition representing the slice of the partition group this member can claim
- * @throws JoinException if there was a problem joining the partition group
- * @throws InterruptedException if interrupted while waiting to join the partition group
- */
- public final Partition join() throws JoinException, InterruptedException {
- final Membership membership = group.join();
- try {
- group.watch(createGroupChangeListener(membership));
- } catch (WatchException e) {
- membership.cancel();
- throw new JoinException("Problem establishing watch on group after joining it", e);
- }
- return new Partition() {
- @Override public boolean isMember(long value) {
- return (value % groupSize) == groupIndex;
- }
-
- @Override public int getNumPartitions() {
- return groupSize;
- }
-
- @Override public String getGroupPath() {
- return membership.getGroupPath();
- }
-
- @Override public String getMemberId() {
- return membership.getMemberId();
- }
-
- @Override public String getMemberPath() {
- return membership.getMemberPath();
- }
-
- @Override public byte[] updateMemberData() throws UpdateException {
- return membership.updateMemberData();
- }
-
- @Override public void cancel() throws JoinException {
- membership.cancel();
- }
- };
- }
-
- @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) {
- return new GroupChangeListener() {
- @Override public void onGroupChange(Iterable<String> memberIds) {
- List<String> members = Ordering.natural().sortedCopy(memberIds);
- int newSize = members.size();
- int newIndex = members.indexOf(membership.getMemberId());
-
- LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]",
- membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex));
-
- groupSize = newSize;
- groupIndex = newIndex;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
deleted file mode 100644
index 2021f51..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import com.twitter.common.net.pool.DynamicHostSet;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-/**
- * A logical set of servers registered in ZooKeeper. Intended to be used by both servers in a
- * common service and their clients.
- *
- * TODO(William Farner): Explore decoupling this from thrift.
- */
-public interface ServerSet extends DynamicHostSet<ServiceInstance> {
-
- /**
- * Attempts to join a server set for this logical service group.
- *
- * @param endpoint the primary service endpoint
- * @param additionalEndpoints and additional endpoints keyed by their logical name
- * @param status the current service status
- * @return an EndpointStatus object that allows the endpoint to adjust its status
- * @throws JoinException if there was a problem joining the server set
- * @throws InterruptedException if interrupted while waiting to join the server set
- * @deprecated The status field is deprecated. Please use {@link #join(InetSocketAddress, Map)}
- */
- @Deprecated
- public EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- Status status) throws JoinException, InterruptedException;
-
- /**
- * Attempts to join a server set for this logical service group.
- *
- * @param endpoint the primary service endpoint
- * @param additionalEndpoints and additional endpoints keyed by their logical name
- * @return an EndpointStatus object that allows the endpoint to adjust its status
- * @throws JoinException if there was a problem joining the server set
- * @throws InterruptedException if interrupted while waiting to join the server set
- */
- public EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints)
- throws JoinException, InterruptedException;
-
- /**
- * Attempts to join a server set for this logical service group.
- *
- * @param endpoint the primary service endpoint
- * @param additionalEndpoints and additional endpoints keyed by their logical name
- * @param shardId Unique shard identifier for this member of the service.
- * @return an EndpointStatus object that allows the endpoint to adjust its status
- * @throws JoinException if there was a problem joining the server set
- * @throws InterruptedException if interrupted while waiting to join the server set
- */
- EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- int shardId) throws JoinException, InterruptedException;
-
- /**
- * A handle to a service endpoint's status data that allows updating it to track current events.
- */
- public interface EndpointStatus {
-
- /**
- * Attempts to update the status of the service endpoint associated with this endpoint. If the
- * {@code status} is {@link Status#DEAD} then the endpoint will be removed from the server set.
- *
- * @param status the current status of the endpoint
- * @throws UpdateException if there was a problem writing the update
- * @deprecated Support for mutable status is deprecated. Please use {@link #leave()}
- */
- @Deprecated
- void update(Status status) throws UpdateException;
-
- /**
- * Removes the endpoint from the server set.
- *
- * @throws UpdateException if there was a problem leaving the ServerSet.
- */
- void leave() throws UpdateException;
- }
-
- /**
- * Indicates an error updating a service's status information.
- */
- public static class UpdateException extends Exception {
- public UpdateException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public UpdateException(String message) {
- super(message);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
deleted file mode 100644
index b00015e..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import com.google.gson.Gson;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Function;
-import com.twitter.common.base.Supplier;
-import com.twitter.common.io.Codec;
-import com.twitter.common.io.CompatibilityCodec;
-import com.twitter.common.io.ThriftCodec;
-import com.twitter.common.util.BackoffHelper;
-import com.twitter.common.zookeeper.Group.GroupChangeListener;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.Group.Membership;
-import com.twitter.common.zookeeper.Group.WatchException;
-import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * ZooKeeper-backed implementation of {@link ServerSet}.
- */
-public class ServerSetImpl implements ServerSet {
- private static final Logger LOG = Logger.getLogger(ServerSetImpl.class.getName());
-
- @CmdLine(name = "serverset_encode_json",
- help = "If true, use JSON for encoding server set information."
- + " Defaults to true (use JSON).")
- private static final Arg<Boolean> ENCODE_JSON = Arg.create(true);
-
- private final ZooKeeperClient zkClient;
- private final Group group;
- private final Codec<ServiceInstance> codec;
- private final BackoffHelper backoffHelper;
-
- /**
- * Creates a new ServerSet using open ZooKeeper node ACLs.
- *
- * @param zkClient the client to use for interactions with ZooKeeper
- * @param path the name-service path of the service to connect to
- */
- public ServerSetImpl(ZooKeeperClient zkClient, String path) {
- this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
- }
-
- /**
- * Creates a new ServerSet for the given service {@code path}.
- *
- * @param zkClient the client to use for interactions with ZooKeeper
- * @param acl the ACL to use for creating the persistent group path if it does not already exist
- * @param path the name-service path of the service to connect to
- */
- public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
- this(zkClient, new Group(zkClient, acl, path), createDefaultCodec());
- }
-
- /**
- * Creates a new ServerSet using the given service {@code group}.
- *
- * @param zkClient the client to use for interactions with ZooKeeper
- * @param group the server group
- */
- public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
- this(zkClient, group, createDefaultCodec());
- }
-
- /**
- * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
- *
- * @param zkClient the client to use for interactions with ZooKeeper
- * @param group the server group
- * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
- * from a byte array
- */
- public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
- this.zkClient = checkNotNull(zkClient);
- this.group = checkNotNull(group);
- this.codec = checkNotNull(codec);
-
- // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
- backoffHelper = new BackoffHelper();
- }
-
- @VisibleForTesting
- ZooKeeperClient getZkClient() {
- return zkClient;
- }
-
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints)
- throws JoinException, InterruptedException {
-
- LOG.log(Level.WARNING,
- "Joining a ServerSet without a shard ID is deprecated and will soon break.");
- return join(endpoint, additionalEndpoints, Optional.<Integer>absent());
- }
-
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- int shardId) throws JoinException, InterruptedException {
-
- return join(endpoint, additionalEndpoints, Optional.of(shardId));
- }
-
- private EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- Optional<Integer> shardId) throws JoinException, InterruptedException {
-
- checkNotNull(endpoint);
- checkNotNull(additionalEndpoints);
-
- final MemberStatus memberStatus =
- new MemberStatus(endpoint, additionalEndpoints, shardId);
- Supplier<byte[]> serviceInstanceSupplier = new Supplier<byte[]>() {
- @Override public byte[] get() {
- return memberStatus.serializeServiceInstance();
- }
- };
- final Membership membership = group.join(serviceInstanceSupplier);
-
- return new EndpointStatus() {
- @Override public void update(Status status) throws UpdateException {
- checkNotNull(status);
- LOG.warning("This method is deprecated. Please use leave() instead.");
- if (status == Status.DEAD) {
- leave();
- } else {
- LOG.warning("Status update has been ignored");
- }
- }
-
- @Override public void leave() throws UpdateException {
- memberStatus.leave(membership);
- }
- };
- }
-
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- Status status) throws JoinException, InterruptedException {
-
- LOG.warning("This method is deprecated. Please do not specify a status field.");
- if (status != Status.ALIVE) {
- LOG.severe("**************************************************************************\n"
- + "WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.\n"
- + "JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status
- + "\n**************************************************************************");
- }
- return join(endpoint, additionalEndpoints);
- }
-
- @Override
- public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
- ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
- try {
- return serverSetWatcher.watch();
- } catch (WatchException e) {
- throw new MonitorException("ZooKeeper watch failed.", e);
- } catch (InterruptedException e) {
- throw new MonitorException("Interrupted while watching ZooKeeper.", e);
- }
- }
-
- @Override
- public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
- LOG.warning("This method is deprecated. Please use watch instead.");
- watch(monitor);
- }
-
- private class MemberStatus {
- private final InetSocketAddress endpoint;
- private final Map<String, InetSocketAddress> additionalEndpoints;
- private final Optional<Integer> shardId;
-
- private MemberStatus(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- Optional<Integer> shardId) {
-
- this.endpoint = endpoint;
- this.additionalEndpoints = additionalEndpoints;
- this.shardId = shardId;
- }
-
- synchronized void leave(Membership membership) throws UpdateException {
- try {
- membership.cancel();
- } catch (JoinException e) {
- throw new UpdateException(
- "Failed to auto-cancel group membership on transition to DEAD status", e);
- }
- }
-
- byte[] serializeServiceInstance() {
- ServiceInstance serviceInstance = new ServiceInstance(
- ServerSets.toEndpoint(endpoint),
- Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
- Status.ALIVE);
-
- if (shardId.isPresent()) {
- serviceInstance.setShard(shardId.get());
- }
-
- LOG.fine("updating endpoint data to:\n\t" + serviceInstance);
- try {
- return ServerSets.serializeServiceInstance(serviceInstance, codec);
- } catch (IOException e) {
- throw new IllegalStateException("Unexpected problem serializing thrift struct " +
- serviceInstance + "to a byte[]", e);
- }
- }
- }
-
- private static class ServiceInstanceFetchException extends RuntimeException {
- ServiceInstanceFetchException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- private static class ServiceInstanceDeletedException extends RuntimeException {
- ServiceInstanceDeletedException(String path) {
- super(path);
- }
- }
-
- private class ServerSetWatcher {
- private final ZooKeeperClient zkClient;
- private final HostChangeMonitor<ServiceInstance> monitor;
- @Nullable private ImmutableSet<ServiceInstance> serverSet;
-
- ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
- this.zkClient = zkClient;
- this.monitor = monitor;
- }
-
- public Command watch() throws WatchException, InterruptedException {
- Watcher onExpirationWatcher = zkClient.registerExpirationHandler(new Command() {
- @Override public void execute() {
- // Servers may have changed Status while we were disconnected from ZooKeeper, check and
- // re-register our node watches.
- rebuildServerSet();
- }
- });
-
- try {
- return group.watch(new GroupChangeListener() {
- @Override public void onGroupChange(Iterable<String> memberIds) {
- notifyGroupChange(memberIds);
- }
- });
- } catch (WatchException e) {
- zkClient.unregister(onExpirationWatcher);
- throw e;
- } catch (InterruptedException e) {
- zkClient.unregister(onExpirationWatcher);
- throw e;
- }
- }
-
- private ServiceInstance getServiceInstance(final String nodePath) {
- try {
- return backoffHelper.doUntilResult(new Supplier<ServiceInstance>() {
- @Override public ServiceInstance get() {
- try {
- byte[] data = zkClient.get().getData(nodePath, false, null);
- return ServerSets.deserializeServiceInstance(data, codec);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ServiceInstanceFetchException(
- "Interrupted updating service data for: " + nodePath, e);
- } catch (ZooKeeperConnectionException e) {
- LOG.log(Level.WARNING,
- "Temporary error trying to updating service data for: " + nodePath, e);
- return null;
- } catch (NoNodeException e) {
- invalidateNodePath(nodePath);
- throw new ServiceInstanceDeletedException(nodePath);
- } catch (KeeperException e) {
- if (zkClient.shouldRetry(e)) {
- LOG.log(Level.WARNING,
- "Temporary error trying to update service data for: " + nodePath, e);
- return null;
- } else {
- throw new ServiceInstanceFetchException(
- "Failed to update service data for: " + nodePath, e);
- }
- } catch (IOException e) {
- throw new ServiceInstanceFetchException(
- "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
- }
- }
- });
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ServiceInstanceFetchException(
- "Interrupted trying to update service data for: " + nodePath, e);
- }
- }
-
- private final LoadingCache<String, ServiceInstance> servicesByMemberId =
- CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
- @Override public ServiceInstance load(String memberId) {
- return getServiceInstance(group.getMemberPath(memberId));
- }
- });
-
- private void rebuildServerSet() {
- Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
- servicesByMemberId.invalidateAll();
- notifyGroupChange(memberIds);
- }
-
- private String invalidateNodePath(String deletedPath) {
- String memberId = group.getMemberId(deletedPath);
- servicesByMemberId.invalidate(memberId);
- return memberId;
- }
-
- private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
- new Function<String, ServiceInstance>() {
- @Override public ServiceInstance apply(String memberId) {
- // This get will trigger a fetch
- try {
- return servicesByMemberId.getUnchecked(memberId);
- } catch (UncheckedExecutionException e) {
- Throwable cause = e.getCause();
- if (!(cause instanceof ServiceInstanceDeletedException)) {
- Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
- throw new IllegalStateException(
- "Unexpected error fetching member data for: " + memberId, e);
- }
- return null;
- }
- }
- };
-
- private synchronized void notifyGroupChange(Iterable<String> memberIds) {
- ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
- Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
-
- // Ignore no-op state changes except for the 1st when we've seen no group yet.
- if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
- SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
- // Implicit removal from servicesByMemberId.
- existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
-
- Iterable<ServiceInstance> serviceInstances = Iterables.filter(
- Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
-
- notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
- }
- }
-
- private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
- // ZK nodes may have changed if there was a session expiry for a server in the server set, but
- // if the server's status has not changed, we can skip any onChange updates.
- if (!currentServerSet.equals(serverSet)) {
- if (currentServerSet.isEmpty()) {
- LOG.warning("server set empty for path " + group.getPath());
- } else {
- if (LOG.isLoggable(Level.INFO)) {
- if (serverSet == null) {
- LOG.info("received initial membership " + currentServerSet);
- } else {
- logChange(Level.INFO, currentServerSet);
- }
- }
- }
- serverSet = currentServerSet;
- monitor.onChange(serverSet);
- }
- }
-
- private void logChange(Level level, ImmutableSet<ServiceInstance> newServerSet) {
- StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
- if (serverSet.size() != newServerSet.size()) {
- message.append("from ").append(serverSet.size())
- .append(" members to ").append(newServerSet.size());
- }
-
- Joiner joiner = Joiner.on("\n\t\t");
-
- SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
- if (!left.isEmpty()) {
- message.append("\n\tleft:\n\t\t").append(joiner.join(left));
- }
-
- SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
- if (!joined.isEmpty()) {
- message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
- }
-
- LOG.log(level, message.toString());
- }
- }
-
- private static class EndpointSchema {
- final String host;
- final Integer port;
-
- EndpointSchema(Endpoint endpoint) {
- Preconditions.checkNotNull(endpoint);
- this.host = endpoint.getHost();
- this.port = endpoint.getPort();
- }
-
- String getHost() {
- return host;
- }
-
- Integer getPort() {
- return port;
- }
- }
-
- private static class ServiceInstanceSchema {
- final EndpointSchema serviceEndpoint;
- final Map<String, EndpointSchema> additionalEndpoints;
- final Status status;
- final Integer shard;
-
- ServiceInstanceSchema(ServiceInstance instance) {
- this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
- if (instance.getAdditionalEndpoints() != null) {
- this.additionalEndpoints = Maps.transformValues(
- instance.getAdditionalEndpoints(),
- new Function<Endpoint, EndpointSchema>() {
- @Override public EndpointSchema apply(Endpoint endpoint) {
- return new EndpointSchema(endpoint);
- }
- }
- );
- } else {
- this.additionalEndpoints = Maps.newHashMap();
- }
- this.status = instance.getStatus();
- this.shard = instance.isSetShard() ? instance.getShard() : null;
- }
-
- EndpointSchema getServiceEndpoint() {
- return serviceEndpoint;
- }
-
- Map<String, EndpointSchema> getAdditionalEndpoints() {
- return additionalEndpoints;
- }
-
- Status getStatus() {
- return status;
- }
-
- Integer getShard() {
- return shard;
- }
- }
-
- /**
- * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the
- * __isset_bit_vector internal thrift struct field that tracks primitive types.
- */
- private static class AdaptedJsonCodec implements Codec<ServiceInstance> {
- private static final Charset ENCODING = Charsets.UTF_8;
- private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class;
- private final Gson gson = new Gson();
-
- @Override
- public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
- Writer w = new OutputStreamWriter(sink, ENCODING);
- gson.toJson(new ServiceInstanceSchema(instance), CLASS, w);
- w.flush();
- }
-
- @Override
- public ServiceInstance deserialize(InputStream source) throws IOException {
- ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS);
- Endpoint primary = new Endpoint(
- output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort());
- Map<String, Endpoint> additional = Maps.transformValues(
- output.getAdditionalEndpoints(),
- new Function<EndpointSchema, Endpoint>() {
- @Override public Endpoint apply(EndpointSchema endpoint) {
- return new Endpoint(endpoint.getHost(), endpoint.getPort());
- }
- }
- );
- ServiceInstance instance =
- new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus());
- if (output.getShard() != null) {
- instance.setShard(output.getShard());
- }
- return instance;
- }
- }
-
- private static Codec<ServiceInstance> createCodec(final boolean useJsonEncoding) {
- final Codec<ServiceInstance> json = new AdaptedJsonCodec();
- final Codec<ServiceInstance> thrift =
- ThriftCodec.create(ServiceInstance.class, ThriftCodec.BINARY_PROTOCOL);
- final Predicate<byte[]> recognizer = new Predicate<byte[]>() {
- public boolean apply(byte[] input) {
- return (input.length > 1 && input[0] == '{' && input[1] == '\"') == useJsonEncoding;
- }
- };
-
- if (useJsonEncoding) {
- return CompatibilityCodec.create(json, thrift, 2, recognizer);
- }
- return CompatibilityCodec.create(thrift, json, 2, recognizer);
- }
-
- /**
- * Creates a codec for {@link ServiceInstance} objects that uses Thrift binary encoding, and can
- * decode both Thrift and JSON encodings.
- *
- * @return a new codec instance.
- */
- public static Codec<ServiceInstance> createThriftCodec() {
- return createCodec(false);
- }
-
- /**
- * Creates a codec for {@link ServiceInstance} objects that uses JSON encoding, and can decode
- * both Thrift and JSON encodings.
- *
- * @return a new codec instance.
- */
- public static Codec<ServiceInstance> createJsonCodec() {
- return createCodec(true);
- }
-
- /**
- * Returns a codec for {@link ServiceInstance} objects that uses either the Thrift or the JSON
- * encoding, depending on whether the command line argument <tt>serverset_json_encofing</tt> is
- * set to <tt>true</tt>, and can decode both Thrift and JSON encodings.
- *
- * @return a new codec instance.
- */
- public static Codec<ServiceInstance> createDefaultCodec() {
- return createCodec(ENCODE_JSON.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
deleted file mode 100644
index fc7ea88..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.common.base.Function;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.io.Codec;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-/**
- * Common ServerSet related functions
- */
-public class ServerSets {
-
- private ServerSets() {
- // Utility class.
- }
-
- /**
- * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
- */
- public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
- new Function<InetSocketAddress, Endpoint>() {
- @Override public Endpoint apply(InetSocketAddress address) {
- return ServerSets.toEndpoint(address);
- }
- };
-
- /**
- * Creates a server set that registers at a single path applying the given ACL to all nodes
- * created in the path.
- *
- * @param zkClient ZooKeeper client to register with.
- * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
- * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set)
- * @return A server set that registers at {@code zkPath}.
- */
- public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
- return create(zkClient, acl, ImmutableSet.of(zkPath));
- }
-
- /**
- * Creates a server set that registers at one or multiple paths applying the given ACL to all
- * nodes created in the paths.
- *
- * @param zkClient ZooKeeper client to register with.
- * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
- * @param zkPaths Paths to register at, must be non-empty.
- * @return A server set that registers at the given {@code zkPath}s.
- */
- public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, Set<String> zkPaths) {
- Preconditions.checkNotNull(zkClient);
- MorePreconditions.checkNotBlank(acl);
- MorePreconditions.checkNotBlank(zkPaths);
-
- if (zkPaths.size() == 1) {
- return new ServerSetImpl(zkClient, acl, Iterables.getOnlyElement(zkPaths));
- } else {
- ImmutableList.Builder<ServerSet> builder = ImmutableList.builder();
- for (String path : zkPaths) {
- builder.add(new ServerSetImpl(zkClient, acl, path));
- }
- return new CompoundServerSet(builder.build());
- }
- }
-
- /**
- * Returns a serialized Thrift service instance object, with given endpoints and codec.
- *
- * @param serviceInstance the Thrift service instance object to be serialized
- * @param codec the codec to use to serialize a Thrift service instance object
- * @return byte array that contains a serialized Thrift service instance
- */
- public static byte[] serializeServiceInstance(
- ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
-
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- codec.serialize(serviceInstance, output);
- return output.toByteArray();
- }
-
- /**
- * Serializes a service instance based on endpoints.
- * @see #serializeServiceInstance(ServiceInstance, Codec)
- *
- * @param address the target address of the service instance
- * @param additionalEndpoints additional endpoints of the service instance
- * @param status service status
- */
- public static byte[] serializeServiceInstance(
- InetSocketAddress address,
- Map<String, Endpoint> additionalEndpoints,
- Status status,
- Codec<ServiceInstance> codec) throws IOException {
-
- ServiceInstance serviceInstance =
- new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
- return serializeServiceInstance(serviceInstance, codec);
- }
-
- /**
- * Creates a service instance object deserialized from byte array.
- *
- * @param data the byte array contains a serialized Thrift service instance
- * @param codec the codec to use to deserialize the byte array
- */
- public static ServiceInstance deserializeServiceInstance(
- byte[] data, Codec<ServiceInstance> codec) throws IOException {
-
- return codec.deserialize(new ByteArrayInputStream(data));
- }
-
- /**
- * Creates an endpoint for the given InetSocketAddress.
- *
- * @param address the target address to create the endpoint for
- */
- public static Endpoint toEndpoint(InetSocketAddress address) {
- return new Endpoint(address.getHostName(), address.getPort());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
deleted file mode 100644
index 91e28f2..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.zookeeper.Candidate.Leader;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.ServerSet.EndpointStatus;
-import com.twitter.common.zookeeper.ServerSet.UpdateException;
-import com.twitter.thrift.Status;
-
-/**
- * A service that uses master election to only allow a single instance of the server to join
- * the {@link ServerSet} at a time.
- */
-public class SingletonService {
- private static final Logger LOG = Logger.getLogger(SingletonService.class.getName());
-
- @VisibleForTesting
- static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
-
- /**
- * Creates a candidate that can be combined with an existing server set to form a singleton
- * service using {@link #SingletonService(ServerSet, Candidate)}.
- *
- * @param zkClient The ZooKeeper client to use.
- * @param servicePath The path where service nodes live.
- * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
- * @return A candidate that can be housed with a standard server set under a single zk path.
- */
- public static Candidate createSingletonCandidate(
- ZooKeeperClient zkClient,
- String servicePath,
- Iterable<ACL> acl) {
-
- return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
- }
-
- private final ServerSet serverSet;
- private final Candidate candidate;
-
- /**
- * Equivalent to {@link #SingletonService(ZooKeeperClient, String, Iterable)} with a default
- * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
- */
- public SingletonService(ZooKeeperClient zkClient, String servicePath) {
- this(zkClient, servicePath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
- }
-
- /**
- * Creates a new singleton service, identified by {@code servicePath}. All nodes related to the
- * service (for both leader election and service registration) will live under the path and each
- * node will be created with the supplied {@code acl}. Internally, two ZooKeeper {@code Group}s
- * are used to manage a singleton service - one for leader election, and another for the
- * {@code ServerSet} where the leader's endpoints are registered. Leadership election should
- * guarantee that at most one instance will ever exist in the ServerSet at once.
- *
- * @param zkClient The ZooKeeper client to use.
- * @param servicePath The path where service nodes live.
- * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
- */
- public SingletonService(ZooKeeperClient zkClient, String servicePath, Iterable<ACL> acl) {
- this(
- new ServerSetImpl(zkClient, new Group(zkClient, acl, servicePath)),
- createSingletonCandidate(zkClient, servicePath, acl));
- }
-
- /**
- * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
- * advertises itself in the given server set once elected.
- *
- * @param serverSet The server set to advertise in on election.
- * @param candidate The candidacy to use to vie for election.
- */
- public SingletonService(ServerSet serverSet, Candidate candidate) {
- this.serverSet = Preconditions.checkNotNull(serverSet);
- this.candidate = Preconditions.checkNotNull(candidate);
- }
-
- /**
- * Attempts to lead the singleton service.
- *
- * @param endpoint The primary endpoint to register as a leader candidate in the service.
- * @param additionalEndpoints Additional endpoints that are available on the host.
- * @param status deprecated, will be ignored entirely
- * @param listener Handler to call when the candidate is elected or defeated.
- * @throws Group.WatchException If there was a problem watching the ZooKeeper group.
- * @throws Group.JoinException If there was a problem joining the ZooKeeper group.
- * @throws InterruptedException If the thread watching/joining the group was interrupted.
- * @deprecated The status field is deprecated. Please use
- * {@link #lead(InetSocketAddress, Map, LeadershipListener)}
- */
- @Deprecated
- public void lead(final InetSocketAddress endpoint,
- final Map<String, InetSocketAddress> additionalEndpoints,
- final Status status,
- final LeadershipListener listener)
- throws Group.WatchException, Group.JoinException, InterruptedException {
-
- if (status != Status.ALIVE) {
- LOG.severe("******************************************************************************");
- LOG.severe("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
- LOG.severe("JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status);
- LOG.severe("******************************************************************************");
- } else {
- LOG.warning("******************************************************************************");
- LOG.warning("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
- LOG.warning("Please use SingletonService.lead(InetSocketAddress, Map, LeadershipListener)");
- LOG.warning("******************************************************************************");
- }
-
- lead(endpoint, additionalEndpoints, listener);
- }
-
- /**
- * Attempts to lead the singleton service.
- *
- * @param endpoint The primary endpoint to register as a leader candidate in the service.
- * @param additionalEndpoints Additional endpoints that are available on the host.
- * @param listener Handler to call when the candidate is elected or defeated.
- * @throws Group.WatchException If there was a problem watching the ZooKeeper group.
- * @throws Group.JoinException If there was a problem joining the ZooKeeper group.
- * @throws InterruptedException If the thread watching/joining the group was interrupted.
- */
- public void lead(final InetSocketAddress endpoint,
- final Map<String, InetSocketAddress> additionalEndpoints,
- final LeadershipListener listener)
- throws Group.WatchException, Group.JoinException, InterruptedException {
-
- Preconditions.checkNotNull(listener);
-
- candidate.offerLeadership(new Leader() {
- private EndpointStatus endpointStatus = null;
- @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
- listener.onLeading(new LeaderControl() {
- EndpointStatus endpointStatus = null;
- final AtomicBoolean left = new AtomicBoolean(false);
-
- // Methods are synchronized to prevent simultaneous invocations.
- @Override public synchronized void advertise()
- throws JoinException, InterruptedException {
-
- Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
- Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
- endpointStatus = serverSet.join(endpoint, additionalEndpoints);
- }
-
- @Override public synchronized void leave() throws UpdateException, JoinException {
- Preconditions.checkState(left.compareAndSet(false, true),
- "Cannot leave more than once.");
- if (endpointStatus != null) {
- endpointStatus.leave();
- }
- abdicate.execute();
- }
- });
- }
-
- @Override public void onDefeated() {
- listener.onDefeated(endpointStatus);
- }
- });
- }
-
- /**
- * A listener to be notified of changes in the leadership status.
- * Implementers should be careful to avoid blocking operations in these callbacks.
- */
- public interface LeadershipListener {
-
- /**
- * Notifies the listener that is is current leader.
- *
- * @param control A controller handle to advertise and/or leave advertised presence.
- */
- public void onLeading(LeaderControl control);
-
- /**
- * Notifies the listener that it is no longer leader. The leader should take this opportunity
- * to remove its advertisement gracefully.
- *
- * @param status A handle on the endpoint status for the advertised leader.
- */
- public void onDefeated(@Nullable EndpointStatus status);
- }
-
- /**
- * A leadership listener that decorates another listener by automatically defeating a
- * leader that has dropped its connection to ZooKeeper.
- * Note that the decision to use this over session-based mutual exclusion should not be taken
- * lightly. Any momentary connection loss due to a flaky network or a ZooKeeper server process
- * exit will cause a leader to abort.
- */
- public static class DefeatOnDisconnectLeader implements LeadershipListener {
-
- private final LeadershipListener wrapped;
- private Optional<LeaderControl> maybeControl = Optional.absent();
-
- /**
- * Creates a new leadership listener that will delegate calls to the wrapped listener, and
- * invoke {@link #onDefeated(EndpointStatus)} if a ZooKeeper disconnect is observed while
- * leading.
- *
- * @param zkClient The ZooKeeper client to watch for disconnect events.
- * @param wrapped The leadership listener to wrap.
- */
- public DefeatOnDisconnectLeader(ZooKeeperClient zkClient, LeadershipListener wrapped) {
- this.wrapped = Preconditions.checkNotNull(wrapped);
-
- zkClient.register(new Watcher() {
- @Override public void process(WatchedEvent event) {
- if ((event.getType() == EventType.None)
- && (event.getState() == KeeperState.Disconnected)) {
- disconnected();
- }
- }
- });
- }
-
- private synchronized void disconnected() {
- if (maybeControl.isPresent()) {
- LOG.warning("Disconnected from ZooKeeper while leading, committing suicide.");
- try {
- wrapped.onDefeated(null);
- maybeControl.get().leave();
- } catch (UpdateException e) {
- LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
- } catch (JoinException e) {
- LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
- } finally {
- setControl(null);
- }
- } else {
- LOG.info("Disconnected from ZooKeeper, but that's fine because I'm not the leader.");
- }
- }
-
- private synchronized void setControl(@Nullable LeaderControl control) {
- this.maybeControl = Optional.fromNullable(control);
- }
-
- @Override public void onLeading(final LeaderControl control) {
- setControl(control);
- wrapped.onLeading(new LeaderControl() {
- @Override public void advertise() throws JoinException, InterruptedException {
- control.advertise();
- }
-
- @Override public void leave() throws UpdateException, JoinException {
- setControl(null);
- control.leave();
- }
- });
- }
-
- @Override public void onDefeated(@Nullable EndpointStatus status) {
- setControl(null);
- wrapped.onDefeated(status);
- }
- }
-
- /**
- * A controller for the state of the leader. This will be provided to the leader upon election,
- * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and
- * terminate leadership at will.
- */
- public interface LeaderControl {
-
- /**
- * Advertises the leader's server presence to clients.
- *
- * @throws JoinException If there was an error advertising.
- * @throws InterruptedException If interrupted while advertising.
- */
- void advertise() throws JoinException, InterruptedException;
-
- /**
- * Leaves candidacy for leadership, removing advertised server presence if applicable.
- *
- * @throws UpdateException If the leader's status could not be updated.
- * @throws JoinException If there was an error abdicating from leader election.
- */
- void leave() throws UpdateException, JoinException;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
deleted file mode 100644
index 56adcfe..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Commands;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-/**
- * A server set that represents a fixed set of hosts.
- * This may be composed under {@link CompoundServerSet} to ensure a minimum set of hosts is
- * present.
- * A static server set does not support joining, but will allow normal join calls and status update
- * calls to be made.
- */
-public class StaticServerSet implements ServerSet {
-
- private static final Logger LOG = Logger.getLogger(StaticServerSet.class.getName());
-
- private static final Function<Endpoint, ServiceInstance> ENDPOINT_TO_INSTANCE =
- new Function<Endpoint, ServiceInstance>() {
- @Override public ServiceInstance apply(Endpoint endpoint) {
- return new ServiceInstance(endpoint, ImmutableMap.<String, Endpoint>of(), Status.ALIVE);
- }
- };
-
- private final ImmutableSet<ServiceInstance> hosts;
-
- /**
- * Creates a static server set that will reply to monitor calls immediately and exactly once with
- * the provided service instances.
- *
- * @param hosts Hosts in the static set.
- */
- public StaticServerSet(Set<ServiceInstance> hosts) {
- this.hosts = ImmutableSet.copyOf(hosts);
- }
-
- /**
- * Creates a static server set containing the provided endpoints (and no auxiliary ports) which
- * will all be in the {@link Status#ALIVE} state.
- *
- * @param endpoints Endpoints in the static set.
- * @return A static server set that will advertise the provided endpoints.
- */
- public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) {
- return new StaticServerSet(
- ImmutableSet.copyOf(Iterables.transform(endpoints, ENDPOINT_TO_INSTANCE)));
- }
-
- private EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> auxEndpoints,
- Optional<Integer> shardId) {
-
- LOG.warning("Attempt to join fixed server set ignored.");
- ServiceInstance joining = new ServiceInstance(
- ServerSets.toEndpoint(endpoint),
- Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT),
- Status.ALIVE);
- if (shardId.isPresent()) {
- joining.setShard(shardId.get());
- }
- if (!hosts.contains(joining)) {
- LOG.log(Level.SEVERE,
- "Joining instance " + joining + " does not match any member of the static set.");
- }
-
- return new EndpointStatus() {
- @Override public void leave() throws UpdateException {
- LOG.warning("Attempt to adjust state of fixed server set ignored.");
- }
-
- @Override public void update(Status status) throws UpdateException {
- LOG.warning("Attempt to adjust state of fixed server set ignored.");
- }
- };
- }
-
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> auxEndpoints,
- Status status) {
-
- LOG.warning("This method is deprecated. Please do not specify a status field.");
- return join(endpoint, auxEndpoints, Optional.<Integer>absent());
- }
-
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> auxEndpoints) {
-
- LOG.warning("Joining a ServerSet without a shard ID is deprecated and will soon break.");
- return join(endpoint, auxEndpoints, Optional.<Integer>absent());
- }
-
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> auxEndpoints,
- int shardId) throws JoinException, InterruptedException {
-
- return join(endpoint, auxEndpoints, Optional.of(shardId));
- }
-
- @Override
- public Command watch(HostChangeMonitor<ServiceInstance> monitor) {
- monitor.onChange(hosts);
- return Commands.NOOP;
- }
-
- @Override
- public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
- watch(monitor);
- }
-}