You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2016/12/01 17:02:33 UTC
[4/4] aurora git commit: Revert removal of twitter/commons/zk based
leadership code
Revert removal of twitter/commons/zk based leadership code
See discussion here: https://issues.apache.org/jira/browse/AURORA-1840
Reviewed at https://reviews.apache.org/r/54250/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/16e4651d
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/16e4651d
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/16e4651d
Branch: refs/heads/master
Commit: 16e4651d5ff038dad0e9977edea7c57aeb37fe12
Parents: 8bcad84
Author: David McLaughlin <da...@dmclaughlin.com>
Authored: Thu Dec 1 09:01:33 2016 -0800
Committer: David McLaughlin <dm...@twitter.com>
Committed: Thu Dec 1 09:01:33 2016 -0800
----------------------------------------------------------------------
RELEASE-NOTES.md | 1 +
build.gradle | 16 +-
.../aurora/common/zookeeper/Candidate.java | 78 +++
.../aurora/common/zookeeper/CandidateImpl.java | 127 ++++
.../aurora/common/zookeeper/Credentials.java | 90 +++
.../apache/aurora/common/zookeeper/Group.java | 674 +++++++++++++++++++
.../aurora/common/zookeeper/JsonCodec.java | 139 ++++
.../aurora/common/zookeeper/ServerSet.java | 74 ++
.../aurora/common/zookeeper/ServerSetImpl.java | 349 ++++++++++
.../aurora/common/zookeeper/ServerSets.java | 118 ++++
.../common/zookeeper/SingletonService.java | 114 ++++
.../common/zookeeper/SingletonServiceImpl.java | 122 ++++
.../common/zookeeper/ZooKeeperClient.java | 372 ++++++++++
.../aurora/common/zookeeper/ZooKeeperUtils.java | 167 +++++
.../testing/BaseZooKeeperClientTest.java | 140 ++++
.../zookeeper/testing/BaseZooKeeperTest.java | 46 ++
.../zookeeper/testing/ZooKeeperTestServer.java | 121 ++++
.../common/zookeeper/CandidateImplTest.java | 165 +++++
.../aurora/common/zookeeper/GroupTest.java | 321 +++++++++
.../aurora/common/zookeeper/JsonCodecTest.java | 151 +++++
.../common/zookeeper/ServerSetImplTest.java | 258 +++++++
.../aurora/common/zookeeper/ServerSetsTest.java | 44 ++
.../zookeeper/SingletonServiceImplTest.java | 243 +++++++
.../common/zookeeper/ZooKeeperClientTest.java | 210 ++++++
.../common/zookeeper/ZooKeeperUtilsTest.java | 139 ++++
config/findbugs/excludeFilter.xml | 8 -
docs/features/service-discovery.md | 2 +-
docs/reference/scheduler-configuration.md | 2 +
.../aurora/scheduler/SchedulerLifecycle.java | 6 +-
.../aurora/scheduler/app/SchedulerMain.java | 4 +-
.../scheduler/app/ServiceGroupMonitor.java | 46 ++
.../CommonsServiceDiscoveryModule.java | 102 +++
.../discovery/CommonsServiceGroupMonitor.java | 59 ++
.../aurora/scheduler/discovery/Credentials.java | 98 ---
.../CuratorServiceDiscoveryModule.java | 8 +-
.../discovery/CuratorServiceGroupMonitor.java | 1 +
.../discovery/CuratorSingletonService.java | 1 +
.../discovery/FlaggedZooKeeperConfig.java | 21 +-
.../aurora/scheduler/discovery/JsonCodec.java | 147 ----
.../discovery/ServiceDiscoveryModule.java | 20 +-
.../discovery/ServiceGroupMonitor.java | 46 --
.../scheduler/discovery/SingletonService.java | 114 ----
.../scheduler/discovery/ZooKeeperConfig.java | 21 +-
.../scheduler/discovery/ZooKeeperUtils.java | 51 --
.../discovery/testing/BaseZooKeeperTest.java | 53 --
.../discovery/testing/ZooKeeperTestServer.java | 101 ---
.../scheduler/http/JettyServerModule.java | 2 +-
.../aurora/scheduler/http/LeaderRedirect.java | 4 +-
.../log/mesos/MesosLogStreamModule.java | 4 +-
.../scheduler/SchedulerLifecycleTest.java | 4 +-
.../aurora/scheduler/app/SchedulerIT.java | 52 +-
.../discovery/AbstractDiscoveryModuleTest.java | 77 +++
.../discovery/BaseCuratorDiscoveryTest.java | 10 +-
.../discovery/CommonsDiscoveryModuleTest.java | 29 +
.../CommonsServiceGroupMonitorTest.java | 137 ++++
.../discovery/CuratorDiscoveryModuleTest.java | 64 +-
.../discovery/CuratorSingletonServiceTest.java | 3 +-
.../scheduler/discovery/JsonCodecTest.java | 159 -----
.../discovery/ZooKeeperConfigTest.java | 17 +-
.../scheduler/http/AbstractJettyTest.java | 15 +-
.../scheduler/http/LeaderRedirectTest.java | 4 +-
.../aurora/scheduler/thrift/ThriftIT.java | 2 +-
62 files changed, 4871 insertions(+), 902 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 96926f4..7a3d331 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -26,6 +26,7 @@
- The scheduler flag `-zk_use_curator` has been removed. If you have never set the flag and are
upgrading you should take care as described in the [note](#zk_use_curator_upgrade) below.
+=======
0.16.0
======
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index f257440..2f23b85 100644
--- a/build.gradle
+++ b/build.gradle
@@ -164,7 +164,6 @@ project(':commons') {
dependencies {
compile project(':commons-args')
- compile "ch.qos.logback:logback-classic:${logbackRev}"
compile "com.google.code.findbugs:jsr305:${jsrRev}"
compile "com.google.code.gson:gson:${gsonRev}"
compile "com.google.guava:guava:${guavaRev}"
@@ -175,13 +174,17 @@ project(':commons') {
compile "javax.servlet:javax.servlet-api:${servletRev}"
compile "joda-time:joda-time:2.9.1"
compile "org.antlr:stringtemplate:${stringTemplateRev}"
+ compile "org.apache.zookeeper:zookeeper:${zookeeperRev}"
compile "org.easymock:easymock:3.4"
// There are a few testing support libs in the src/main/java trees that use junit - currently:
+ // src/main/java/org/apache/aurora/common/zookeeper/testing
// src/main/java/org/apache/aurora/common/testing
compile "junit:junit:${junitRev}"
testCompile "junit:junit:${junitRev}"
+ testCompile "org.powermock:powermock-module-junit4:1.6.4"
+ testCompile "org.powermock:powermock-api-easymock:1.6.4"
}
}
@@ -347,11 +350,9 @@ dependencies {
compile project(':commons')
compile project(':commons-args')
-
compile 'aopalliance:aopalliance:1.0'
- compile "ch.qos.logback:logback-classic:${logbackRev}"
+ compile 'ch.qos.logback:logback-classic:1.1.3'
compile "com.google.code.findbugs:jsr305:${jsrRev}"
- compile "com.google.code.gson:gson:${gsonRev}"
compile "com.google.inject:guice:${guiceRev}"
compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}"
compile "com.google.protobuf:protobuf-java:${protobufRev}"
@@ -385,15 +386,8 @@ dependencies {
compile 'org.quartz-scheduler:quartz:2.2.2'
compile "uno.perk:forward:1.0.0"
- // There are a few testing support libs in the src/main/java trees that use junit - currently:
- // src/main/java/org/apache/aurora/common/zookeeper/testing
- compile "junit:junit:${junitRev}"
-
testCompile "com.sun.jersey:jersey-client:${jerseyRev}"
testCompile "junit:junit:${junitRev}"
- testCompile "org.powermock:powermock-module-junit4:1.6.4"
- testCompile "org.powermock:powermock-api-easymock:1.6.4"
-
}
// For normal developer builds, avoid running the often-time-consuming code quality checks.
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
new file mode 100644
index 0000000..75c1b14
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Interface definition for becoming or querying for a ZooKeeper-based group leader.
+ */
+public interface Candidate {
+
+ /**
+ * Returns the current group leader by querying ZooKeeper synchronously.
+ *
+ * @return the current group leader's identifying data or {@link Optional#absent()} if there is
+ * no leader
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading the leader information
+ * @throws InterruptedException if this thread is interrupted getting the leader
+ */
+ public Optional<byte[]> getLeaderData()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException;
+
+ /**
+ * Encapsulates a leader that can be elected and subsequently defeated.
+ */
+ interface Leader {
+
+ /**
+ * Called when this leader has been elected.
+ *
+ * @param abdicate a command that can be used to abdicate leadership and force a new election
+ */
+ void onElected(ExceptionalCommand<JoinException> abdicate);
+
+ /**
+ * Called when the leader has been ousted. Can occur either if the leader abdicates or if an
+ * external event causes the leader to lose its leadership role (session expiration).
+ */
+ void onDefeated();
+ }
+
+ /**
+ * Offers this candidate in leadership elections for as long as the current jvm process is alive.
+ * Upon election, the {@code onElected} callback will be executed and a command that can be used
+ * to abdicate leadership will be passed in. If the elected leader jvm process dies or the
+ * elected leader successfully abdicates then a new leader will be elected. Leaders that
+ * successfully abdicate are removed from the group and will not be eligible for leadership
+ * election unless {@link #offerLeadership(Leader)} is called again.
+ *
+ * @param leader the leader to notify of election and defeat events
+ * @throws JoinException if there was a problem joining the group
+ * @throws WatchException if there is a problem generating the 1st group membership list
+ * @throws InterruptedException if interrupted waiting to join the group and determine initial
+ * election results
+ * @return a supplier that can be queried to find out if this leader is currently elected
+ */
+ public Supplier<Boolean> offerLeadership(Leader leader)
+ throws JoinException, WatchException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
new file mode 100644
index 0000000..98b5ee4
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements leader election for small groups of candidates. This implementation is subject to the
+ * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
+ * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
+ */
+public class CandidateImpl implements Candidate {
+ private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class);
+
+ private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
+
+ private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> {
+ try {
+ return InetAddress.getLocalHost().getHostAddress().getBytes();
+ } catch (UnknownHostException e) {
+ LOG.warn("Failed to determine local address!", e);
+ return UNKNOWN_CANDIDATE_DATA;
+ }
+ };
+
+ private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
+ candidates -> Ordering.natural().min(candidates);
+
+ private final Group group;
+
+ /**
+ * Creates a candidate that can be used to offer leadership for the given {@code group}.
+ */
+ public CandidateImpl(Group group) {
+ this.group = Preconditions.checkNotNull(group);
+ }
+
+ @Override
+ public Optional<byte[]> getLeaderData()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+
+ String leaderId = getLeader(group.getMemberIds());
+ return leaderId == null
+ ? Optional.<byte[]>absent()
+ : Optional.of(group.getMemberData(leaderId));
+ }
+
+ @Override
+ public Supplier<Boolean> offerLeadership(final Leader leader)
+ throws JoinException, WatchException, InterruptedException {
+
+ final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated);
+
+ final AtomicBoolean elected = new AtomicBoolean(false);
+ final AtomicBoolean abdicated = new AtomicBoolean(false);
+ group.watch(memberIds -> {
+ boolean noCandidates = Iterables.isEmpty(memberIds);
+ String memberId = membership.getMemberId();
+
+ if (noCandidates) {
+ LOG.warn("All candidates have temporarily left the group: " + group);
+ } else if (!Iterables.contains(memberIds, memberId)) {
+ LOG.error(
+ "Current member ID {} is not a candidate for leader, current voting: {}",
+ memberId, memberIds);
+ } else {
+ boolean electedLeader = memberId.equals(getLeader(memberIds));
+ boolean previouslyElected = elected.getAndSet(electedLeader);
+
+ if (!previouslyElected && electedLeader) {
+ LOG.info("Candidate {} is now leader of group: {}",
+ membership.getMemberPath(), memberIds);
+
+ leader.onElected(() -> {
+ membership.cancel();
+ abdicated.set(true);
+ });
+ } else if (!electedLeader) {
+ if (previouslyElected) {
+ leader.onDefeated();
+ }
+ LOG.info(
+ "Candidate {} waiting for the next leader election, current voting: {}",
+ membership.getMemberPath(), memberIds);
+ }
+ }
+ });
+
+ return () -> !abdicated.get() && elected.get();
+ }
+
+ @Nullable
+ private String getLeader(Iterable<String> memberIds) {
+ return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
new file mode 100644
index 0000000..18319a3
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.commons.lang.builder.EqualsBuilder;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Encapsulates a user's ZooKeeper credentials.
+ */
+public final class Credentials {
+
+ /**
+ * Creates a set of credentials for the ZooKeeper digest authentication mechanism.
+ *
+ * @param username the username to authenticate with
+ * @param password the password to authenticate with
+ * @return a set of credentials that can be used to authenticate the zoo keeper client
+ */
+ public static Credentials digestCredentials(String username, String password) {
+ MorePreconditions.checkNotBlank(username);
+ Preconditions.checkNotNull(password);
+
+ // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
+ // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
+ // Consider writing and installing a version of DigestAuthenticationProvider that controls its
+ // Charset explicitly.
+ return new Credentials("digest", (username + ":" + password).getBytes());
+ }
+
+ private final String scheme;
+ private final byte[] authToken;
+
+ public Credentials(String scheme, byte[] authToken) {
+ this.scheme = MorePreconditions.checkNotBlank(scheme);
+ this.authToken = requireNonNull(authToken);
+ }
+
+ /**
+ * Returns the authentication scheme these credentials are for.
+ *
+ * @return the scheme these credentials are for.
+ */
+ public String scheme() {
+ return scheme;
+ }
+
+ /**
+ * Returns the authentication token.
+ *
+ * @return the authentication token.
+ */
+ public byte[] authToken() {
+ return authToken;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Credentials)) {
+ return false;
+ }
+
+ Credentials other = (Credentials) o;
+ return new EqualsBuilder()
+ .append(scheme, other.scheme())
+ .append(authToken, other.authToken())
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(scheme, authToken);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
new file mode 100644
index 0000000..2720dd1
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Commands;
+import org.apache.aurora.common.base.ExceptionalSupplier;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.BackoffHelper;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class exposes methods for joining and monitoring distributed groups. The groups this class
+ * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
+ * each member of a group.
+ */
+public class Group {
+ private static final Logger LOG = LoggerFactory.getLogger(Group.class);
+
+ private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
+ private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
+
+ private final ZooKeeperClient zkClient;
+ private final ImmutableList<ACL> acl;
+ private final String path;
+
+ private final NodeScheme nodeScheme;
+ private final Predicate<String> nodeNameFilter;
+
+ private final BackoffHelper backoffHelper;
+
+ /**
+ * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or
+ * duplicate slashes will be normalized. For example, all the following paths would create a
+ * group at the normalized path /my/distributed/group:
+ * <ul>
+ * <li>/my/distributed/group
+ * <li>/my/distributed/group/
+ * <li>/my/distributed//group
+ * </ul>
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param acl the ACL to use for creating the persistent group path if it does not already exist
+ * @param path the absolute persistent path that represents this group
+ * @param nodeScheme the scheme that defines how nodes are created
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
+ this.zkClient = Preconditions.checkNotNull(zkClient);
+ this.acl = ImmutableList.copyOf(acl);
+ this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
+
+ this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
+ nodeNameFilter = Group.this.nodeScheme::isMember;
+
+ backoffHelper = new BackoffHelper();
+ }
+
+ /**
+ * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
+ * {@code namePrefix} of 'member_'.
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+ this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
+ }
+
+ /**
+ * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
+ * {@link DefaultScheme} using {@code namePrefix}.
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
+ this(zkClient, acl, path, new DefaultScheme(namePrefix));
+ }
+
+ public String getMemberPath(String memberId) {
+ return path + "/" + MorePreconditions.checkNotBlank(memberId);
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getMemberId(String nodePath) {
+ MorePreconditions.checkNotBlank(nodePath);
+ Preconditions.checkArgument(nodePath.startsWith(path + "/"),
+ "Not a member of this group[%s]: %s", path, nodePath);
+
+ String memberId = StringUtils.substringAfterLast(nodePath, "/");
+ Preconditions.checkArgument(nodeScheme.isMember(memberId),
+ "Not a group member: %s", memberId);
+ return memberId;
+ }
+
+ /**
+ * Returns the current list of group member ids by querying ZooKeeper synchronously.
+ *
+ * @return the ids of all the present members of this group
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading this group's member ids
+ * @throws InterruptedException if this thread is interrupted listing the group members
+ */
+ public Iterable<String> getMemberIds()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+ return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
+ }
+
+ /**
+ * Gets the data for one of this groups members by querying ZooKeeper synchronously.
+ *
+ * @param memberId the id of the member whose data to retrieve
+ * @return the data associated with the {@code memberId}
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading this member's data
+ * @throws InterruptedException if this thread is interrupted retrieving the member data
+ */
+ public byte[] getMemberData(String memberId)
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+ return zkClient.get().getData(getMemberPath(memberId), false, null);
+ }
+
+ /**
+ * Represents membership in a distributed group.
+ */
+ public interface Membership {
+
+ /**
+ * Returns the persistent ZooKeeper path that represents this group.
+ */
+ String getGroupPath();
+
+ /**
+ * Returns the id (ZooKeeper node name) of this group member. May change over time if the
+ * ZooKeeper session expires.
+ */
+ String getMemberId();
+
+ /**
+ * Returns the full ZooKeeper path to this group member. May change over time if the
+ * ZooKeeper session expires.
+ */
+ String getMemberPath();
+
+ /**
+ * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
+ * {@link Group#join()}.
+ *
+ * @return the new membership data
+ * @throws UpdateException if there was a problem updating the membership data
+ */
+ byte[] updateMemberData() throws UpdateException;
+
+ /**
+ * Cancels group membership by deleting the associated ZooKeeper member node.
+ *
+ * @throws JoinException if there is a problem deleting the node
+ */
+ void cancel() throws JoinException;
+ }
+
+ /**
+ * Indicates an error joining a group.
+ */
+ public static class JoinException extends Exception {
+ public JoinException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Indicates an error updating a group member's data.
+ */
+ public static class UpdateException extends Exception {
+ public UpdateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Equivalent to calling {@code join(null, null)}.
+ */
+ public final Membership join() throws JoinException, InterruptedException {
+ return join(NO_MEMBER_DATA, null);
+ }
+
+ /**
+ * Equivalent to calling {@code join(memberData, null)}.
+ */
+ public final Membership join(Supplier<byte[]> memberData)
+ throws JoinException, InterruptedException {
+
+ return join(memberData, null);
+ }
+
+ /**
+ * Equivalent to calling {@code join(null, onLoseMembership)}.
+ */
+ public final Membership join(@Nullable final Command onLoseMembership)
+ throws JoinException, InterruptedException {
+
+ return join(NO_MEMBER_DATA, onLoseMembership);
+ }
+
+ /**
+ * Joins this group and returns the resulting Membership when successful. Membership will be
+ * automatically cancelled when the current jvm process dies; however the returned Membership
+ * object can be used to cancel membership earlier. Unless
+ * {@link Group.Membership#cancel()} is called the membership will
+ * be maintained by re-establishing it silently in the background.
+ *
+ * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an
+ * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
+ * membership in the group.
+ *
+ * @param memberData a supplier of the data to store in the member node
+ * @param onLoseMembership a callback to notify when membership is lost
+ * @return a Membership object with the member details
+ * @throws JoinException if there was a problem joining the group
+ * @throws InterruptedException if this thread is interrupted awaiting completion of the join
+ */
+ public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
+ throws JoinException, InterruptedException {
+
+ Preconditions.checkNotNull(memberData);
+ ensurePersistentGroupPath();
+
+ final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
+ return backoffHelper.doUntilResult(() -> {
+ try {
+ return groupJoiner.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to join group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Temporary error trying to join group at path: " + path, e);
+ return null;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error trying to join group at path: " + path, e);
+ return null;
+ } else {
+ throw new JoinException("Problem joining partition group at path: " + path, e);
+ }
+ }
+ });
+ }
+
+ private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
+ backoffHelper.doUntilSuccess(() -> {
+ try {
+ ZooKeeperUtils.ensurePath(zkClient, acl, path);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error ensuring path: " + path, e);
+ return false;
+ } else {
+ throw new JoinException("Problem ensuring group at path: " + path, e);
+ }
+ }
+ });
+ }
+
+ private class ActiveMembership implements Membership {
+ private final Supplier<byte[]> memberData;
+ private final Command onLoseMembership;
+ private String nodePath;
+ private String memberId;
+ private volatile boolean cancelled;
+ private byte[] membershipData;
+
+ public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
+ this.memberData = memberData;
+ this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
+ }
+
+ @Override
+ public String getGroupPath() {
+ return path;
+ }
+
+ @Override
+ public synchronized String getMemberId() {
+ return memberId;
+ }
+
+ @Override
+ public synchronized String getMemberPath() {
+ return nodePath;
+ }
+
+ @Override
+ public synchronized byte[] updateMemberData() throws UpdateException {
+ byte[] membershipData = memberData.get();
+ if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
+ try {
+ zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
+ this.membershipData = membershipData;
+ } catch (KeeperException e) {
+ throw new UpdateException("Problem updating membership data.", e);
+ } catch (InterruptedException e) {
+ throw new UpdateException("Interrupted attempting to update membership data.", e);
+ } catch (ZooKeeperConnectionException e) {
+ throw new UpdateException(
+ "Could not connect to the ZooKeeper cluster to update membership data.", e);
+ }
+ }
+ return membershipData;
+ }
+
+ @Override
+ public synchronized void cancel() throws JoinException {
+ if (!cancelled) {
+ try {
+ backoffHelper.doUntilSuccess(() -> {
+ try {
+ zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (NoNodeException e) {
+ LOG.info("Membership already cancelled, node at path: " + nodePath +
+ " has been deleted");
+ return true;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error cancelling membership: " + nodePath, e);
+ return false;
+ } else {
+ throw new JoinException("Problem cancelling membership: " + nodePath, e);
+ }
+ }
+ });
+ cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Problem cancelling membership: " + nodePath, e);
+ }
+ }
+ }
+
+ private class CancelledException extends IllegalStateException { /* marker */ }
+
+ synchronized Membership join()
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+ if (cancelled) {
+ throw new CancelledException();
+ }
+
+ if (nodePath == null) {
+ // Re-join if our ephemeral node goes away due to session expiry - only needs to be
+ // registered once.
+ zkClient.registerExpirationHandler(this::tryJoin);
+ }
+
+ byte[] membershipData = memberData.get();
+ String nodeName = nodeScheme.createName(membershipData);
+ CreateMode createMode = nodeScheme.isSequential()
+ ? CreateMode.EPHEMERAL_SEQUENTIAL
+ : CreateMode.EPHEMERAL;
+ nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
+ memberId = Group.this.getMemberId(nodePath);
+ LOG.info("Set group member ID to " + memberId);
+ this.membershipData = membershipData;
+
+ // Re-join if our ephemeral node goes away due to maliciousness.
+ zkClient.get().exists(nodePath, event -> {
+ if (event.getType() == EventType.NodeDeleted) {
+ tryJoin();
+ }
+ });
+
+ return this;
+ }
+
+ private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
+ () -> {
+ try {
+ join();
+ return true;
+ } catch (CancelledException e) {
+ // Lost a cancel race - that's ok.
+ return true;
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error re-joining group: " + path, e);
+ return false;
+ } else {
+ throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
+ }
+ }
+ };
+
+ private synchronized void tryJoin() {
+ onLoseMembership.execute();
+ try {
+ backoffHelper.doUntilSuccess(tryJoin);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
+ }
+ }
+ }
+
+ /**
+ * An interface to an object that listens for changes to a group's membership.
+ */
+ public interface GroupChangeListener {
+
+ /**
+ * Called whenever group membership changes with the new list of member ids.
+ *
+ * @param memberIds the current member ids
+ */
+ void onGroupChange(Iterable<String> memberIds);
+ }
+
+ /**
+ * An interface that dictates the scheme to use for storing and filtering nodes that represent
+ * members of a distributed group.
+ */
+ public interface NodeScheme {
+ /**
+ * Determines if a child node is a member of a group by examining the node's name.
+ *
+ * @param nodeName the name of a child node found in a group
+ * @return {@code true} if {@code nodeName} identifies a group member in this scheme
+ */
+ boolean isMember(String nodeName);
+
+ /**
+ * Generates a node name for the node representing this process in the distributed group.
+ *
+ * @param membershipData the data that will be stored in this node
+ * @return the name for the node that will represent this process in the group
+ */
+ String createName(byte[] membershipData);
+
+ /**
+ * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
+ *
+ * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
+ */
+ boolean isSequential();
+ }
+
+ /**
+ * Indicates an error watching a group.
+ */
+ public static class WatchException extends Exception {
+ public WatchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Watches this group for the lifetime of this jvm process. This method will block until the
+ * current group members are available, notify the {@code groupChangeListener} and then return.
+ * All further changes to the group membership will cause notifications on a background thread.
+ *
+ * @param groupChangeListener the listener to notify of group membership change events
+ * @return A command which, when executed, will stop watching the group.
+ * @throws WatchException if there is a problem generating the 1st group membership list
+ * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
+ */
+ public final Command watch(final GroupChangeListener groupChangeListener)
+ throws WatchException, InterruptedException {
+ Preconditions.checkNotNull(groupChangeListener);
+
+ try {
+ ensurePersistentGroupPath();
+ } catch (JoinException e) {
+ throw new WatchException("Failed to create group path: " + path, e);
+ }
+
+ final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
+ backoffHelper.doUntilSuccess(() -> {
+ try {
+ groupMonitor.watchGroup();
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new WatchException("Interrupted trying to watch group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Temporary error trying to watch group at path: " + path, e);
+ return null;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error trying to watch group at path: " + path, e);
+ return null;
+ } else {
+ throw new WatchException("Problem trying to watch group at path: " + path, e);
+ }
+ }
+ });
+ return groupMonitor::stopWatching;
+ }
+
+ /**
+ * Helps continuously monitor a group for membership changes.
+ */
+ private class GroupMonitor {
+ private final GroupChangeListener groupChangeListener;
+ private volatile boolean stopped = false;
+ private Set<String> members;
+
+ GroupMonitor(GroupChangeListener groupChangeListener) {
+ this.groupChangeListener = groupChangeListener;
+ }
+
+ private final Watcher groupWatcher = event -> {
+ if (event.getType() == EventType.NodeChildrenChanged) {
+ tryWatchGroup();
+ }
+ };
+
+ private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
+ () -> {
+ try {
+ watchGroup();
+ return true;
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error re-watching group: " + path, e);
+ return false;
+ } else {
+ throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
+ }
+ }
+ };
+
+ private void tryWatchGroup() {
+ if (stopped) {
+ return;
+ }
+
+ try {
+ backoffHelper.doUntilSuccess(tryWatchGroup);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
+ }
+ }
+
+ private void watchGroup()
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+ if (stopped) {
+ return;
+ }
+
+ List<String> children = zkClient.get().getChildren(path, groupWatcher);
+ setMembers(Iterables.filter(children, nodeNameFilter));
+ }
+
+ private void stopWatching() {
+ // TODO(William Farner): Cancel the watch when
+ // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
+ LOG.info("Stopping watch on " + this);
+ stopped = true;
+ }
+
+ synchronized void setMembers(Iterable<String> members) {
+ if (stopped) {
+ LOG.info("Suppressing membership update, no longer watching " + this);
+ return;
+ }
+
+ if (this.members == null) {
+ // Reset our watch on the group if session expires - only needs to be registered once.
+ zkClient.registerExpirationHandler(this::tryWatchGroup);
+ }
+
+ Set<String> membership = ImmutableSet.copyOf(members);
+ if (!membership.equals(this.members)) {
+ groupChangeListener.onGroupChange(members);
+ this.members = membership;
+ }
+ }
+ }
+
+ /**
+ * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
+ * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
+ * prefix is "member_", the node's full path will look something like
+ * {@code /discovery/servicename/member_0000000007}.
+ */
+ public static class DefaultScheme implements NodeScheme {
+ private final String namePrefix;
+ private final Pattern namePattern;
+
+ /**
+ * Creates a sequential node scheme based on the given node name prefix.
+ *
+ * @param namePrefix the prefix for the names of the member nodes
+ */
+ public DefaultScheme(String namePrefix) {
+ this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
+ namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
+ }
+
+ @Override
+ public boolean isMember(String nodeName) {
+ return namePattern.matcher(nodeName).matches();
+ }
+
+ @Override
+ public String createName(byte[] membershipData) {
+ return namePrefix;
+ }
+
+ @Override
+ public boolean isSequential() {
+ return true;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Group " + path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
new file mode 100644
index 0000000..9d31608
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonParseException;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+import static java.util.Objects.requireNonNull;
+
+class JsonCodec implements Codec<ServiceInstance> {
+
+ private static void assertRequiredField(String fieldName, Object fieldValue) {
+ if (fieldValue == null) {
+ throw new JsonParseException(String.format("Field %s is required", fieldName));
+ }
+ }
+
+ private static class EndpointSchema {
+ private final String host;
+ private final Integer port;
+
+ EndpointSchema(Endpoint endpoint) {
+ host = endpoint.getHost();
+ port = endpoint.getPort();
+ }
+
+ Endpoint asEndpoint() {
+ assertRequiredField("host", host);
+ assertRequiredField("port", port);
+
+ return new Endpoint(host, port);
+ }
+ }
+
+ private static class ServiceInstanceSchema {
+ private final EndpointSchema serviceEndpoint;
+ private final Map<String, EndpointSchema> additionalEndpoints;
+ private final Status status;
+ private final @Nullable Integer shard;
+
+ ServiceInstanceSchema(ServiceInstance instance) {
+ serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+ if (instance.isSetAdditionalEndpoints()) {
+ additionalEndpoints =
+ Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
+ } else {
+ additionalEndpoints = ImmutableMap.of();
+ }
+ status = instance.getStatus();
+ shard = instance.isSetShard() ? instance.getShard() : null;
+ }
+
+ ServiceInstance asServiceInstance() {
+ assertRequiredField("serviceEndpoint", serviceEndpoint);
+ assertRequiredField("status", status);
+
+ Map<String, EndpointSchema> extraEndpoints =
+ additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints;
+
+ ServiceInstance instance =
+ new ServiceInstance(
+ serviceEndpoint.asEndpoint(),
+ Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint),
+ status);
+ if (shard != null) {
+ instance.setShard(shard);
+ }
+ return instance;
+ }
+ }
+
+ private static final Charset ENCODING = Charsets.UTF_8;
+
+ private final Gson gson;
+
+ JsonCodec() {
+ this(new Gson());
+ }
+
+ JsonCodec(Gson gson) {
+ this.gson = requireNonNull(gson);
+ }
+
+ @Override
+ public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
+ Writer writer = new OutputStreamWriter(sink, ENCODING);
+ try {
+ gson.toJson(new ServiceInstanceSchema(instance), writer);
+ } catch (JsonIOException e) {
+ throw new IOException(String.format("Problem serializing %s to JSON", instance), e);
+ }
+ writer.flush();
+ }
+
+ @Override
+ public ServiceInstance deserialize(InputStream source) throws IOException {
+ InputStreamReader reader = new InputStreamReader(source, ENCODING);
+ try {
+ @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class);
+ if (schema == null) {
+ throw new IOException("JSON did not include a ServiceInstance object");
+ }
+ return schema.asServiceInstance();
+ } catch (JsonParseException e) {
+ throw new IOException("Problem parsing JSON ServiceInstance.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
new file mode 100644
index 0000000..aeea02d
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+
+/**
+ * A logical set of servers registered in ZooKeeper. Intended to be used by servers in a
+ * common service to advertise their presence to server-set protocol-aware clients.
+ *
+ * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
+ * rendezvous data to zookeeper so that standard clients can interoperate.
+ */
+public interface ServerSet {
+
+ /**
+ * Encodes a {@link ServiceInstance} as a JSON object.
+ *
+ * This is the default encoding for service instance data in ZooKeeper.
+ */
+ Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
+
+ /**
+ * Attempts to join a server set for this logical service group.
+ *
+ * @param endpoint the primary service endpoint
+ * @param additionalEndpoints and additional endpoints keyed by their logical name
+ * @return an EndpointStatus object that allows the endpoint to adjust its status
+ * @throws JoinException if there was a problem joining the server set
+ * @throws InterruptedException if interrupted while waiting to join the server set
+ */
+ EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints)
+ throws JoinException, InterruptedException;
+
+ /**
+ * A handle to a service endpoint's status data that allows updating it to track current events.
+ */
+ interface EndpointStatus {
+
+ /**
+ * Removes the endpoint from the server set.
+ *
+ * @throws UpdateException if there was a problem leaving the ServerSet.
+ */
+ void leave() throws UpdateException;
+ }
+
+ /**
+ * Indicates an error updating a service's status information.
+ */
+ class UpdateException extends Exception {
+ public UpdateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
new file mode 100644
index 0000000..ace4980
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.util.BackoffHelper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}.
+ */
+public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> {
+ private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class);
+
+ private final ZooKeeperClient zkClient;
+ private final Group group;
+ private final Codec<ServiceInstance> codec;
+ private final BackoffHelper backoffHelper;
+
+ /**
+ * Creates a new ServerSet using open ZooKeeper node ACLs.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param path the name-service path of the service to connect to
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, String path) {
+ this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
+ }
+
+ /**
+ * Creates a new ServerSet for the given service {@code path}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param acl the ACL to use for creating the persistent group path if it does not already exist
+ * @param path the name-service path of the service to connect to
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+ this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
+ }
+
+ /**
+ * Creates a new ServerSet using the given service {@code group}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param group the server group
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
+ this(zkClient, group, JSON_CODEC);
+ }
+
+ /**
+ * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param group the server group
+ * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
+ * from a byte array
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
+ this.zkClient = checkNotNull(zkClient);
+ this.group = checkNotNull(group);
+ this.codec = checkNotNull(codec);
+
+ // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
+ backoffHelper = new BackoffHelper();
+ }
+
+ @VisibleForTesting
+ ZooKeeperClient getZkClient() {
+ return zkClient;
+ }
+
+ @Override
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints)
+ throws Group.JoinException, InterruptedException {
+
+ checkNotNull(endpoint);
+ checkNotNull(additionalEndpoints);
+
+ MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints);
+ Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance;
+ Group.Membership membership = group.join(serviceInstanceSupplier);
+
+ return () -> memberStatus.leave(membership);
+ }
+
+ @Override
+ public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+ ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
+ try {
+ return serverSetWatcher.watch();
+ } catch (Group.WatchException e) {
+ throw new MonitorException("ZooKeeper watch failed.", e);
+ } catch (InterruptedException e) {
+ throw new MonitorException("Interrupted while watching ZooKeeper.", e);
+ }
+ }
+
+ private class MemberStatus {
+ private final InetSocketAddress endpoint;
+ private final Map<String, InetSocketAddress> additionalEndpoints;
+
+ private MemberStatus(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints) {
+
+ this.endpoint = endpoint;
+ this.additionalEndpoints = additionalEndpoints;
+ }
+
+ synchronized void leave(Group.Membership membership) throws UpdateException {
+ try {
+ membership.cancel();
+ } catch (Group.JoinException e) {
+ throw new UpdateException(
+ "Failed to auto-cancel group membership on transition to DEAD status", e);
+ }
+ }
+
+ byte[] serializeServiceInstance() {
+ ServiceInstance serviceInstance = new ServiceInstance(
+ ServerSets.toEndpoint(endpoint),
+ Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
+ Status.ALIVE);
+
+ LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
+ try {
+ return ServerSets.serializeServiceInstance(serviceInstance, codec);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unexpected problem serializing thrift struct " +
+ serviceInstance + "to a byte[]", e);
+ }
+ }
+ }
+
+ private static class ServiceInstanceFetchException extends RuntimeException {
+ ServiceInstanceFetchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private static class ServiceInstanceDeletedException extends RuntimeException {
+ ServiceInstanceDeletedException(String path) {
+ super(path);
+ }
+ }
+
+ private class ServerSetWatcher {
+ private final ZooKeeperClient zkClient;
+ private final HostChangeMonitor<ServiceInstance> monitor;
+ @Nullable private ImmutableSet<ServiceInstance> serverSet;
+
+ ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
+ this.zkClient = zkClient;
+ this.monitor = monitor;
+ }
+
+ public Command watch() throws Group.WatchException, InterruptedException {
+ Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet);
+
+ try {
+ return group.watch(this::notifyGroupChange);
+ } catch (Group.WatchException e) {
+ zkClient.unregister(onExpirationWatcher);
+ throw e;
+ } catch (InterruptedException e) {
+ zkClient.unregister(onExpirationWatcher);
+ throw e;
+ }
+ }
+
+ private ServiceInstance getServiceInstance(final String nodePath) {
+ try {
+ return backoffHelper.doUntilResult(() -> {
+ try {
+ byte[] data = zkClient.get().getData(nodePath, false, null);
+ return ServerSets.deserializeServiceInstance(data, codec);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ServiceInstanceFetchException(
+ "Interrupted updating service data for: " + nodePath, e);
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ LOG.warn("Temporary error trying to updating service data for: " + nodePath, e);
+ return null;
+ } catch (NoNodeException e) {
+ invalidateNodePath(nodePath);
+ throw new ServiceInstanceDeletedException(nodePath);
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error trying to update service data for: " + nodePath, e);
+ return null;
+ } else {
+ throw new ServiceInstanceFetchException(
+ "Failed to update service data for: " + nodePath, e);
+ }
+ } catch (IOException e) {
+ throw new ServiceInstanceFetchException(
+ "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
+ }
+ });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ServiceInstanceFetchException(
+ "Interrupted trying to update service data for: " + nodePath, e);
+ }
+ }
+
+ private final LoadingCache<String, ServiceInstance> servicesByMemberId =
+ CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
+ @Override public ServiceInstance load(String memberId) {
+ return getServiceInstance(group.getMemberPath(memberId));
+ }
+ });
+
+ private void rebuildServerSet() {
+ Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
+ servicesByMemberId.invalidateAll();
+ notifyGroupChange(memberIds);
+ }
+
+ private String invalidateNodePath(String deletedPath) {
+ String memberId = group.getMemberId(deletedPath);
+ servicesByMemberId.invalidate(memberId);
+ return memberId;
+ }
+
+ private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
+ memberId -> {
+ // This get will trigger a fetch
+ try {
+ return servicesByMemberId.getUnchecked(memberId);
+ } catch (UncheckedExecutionException e) {
+ Throwable cause = e.getCause();
+ if (!(cause instanceof ServiceInstanceDeletedException)) {
+ Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
+ throw new IllegalStateException(
+ "Unexpected error fetching member data for: " + memberId, e);
+ }
+ return null;
+ }
+ };
+
+ private synchronized void notifyGroupChange(Iterable<String> memberIds) {
+ ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
+ Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
+
+ // Ignore no-op state changes except for the 1st when we've seen no group yet.
+ if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
+ SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
+ // Implicit removal from servicesByMemberId.
+ existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
+
+ Iterable<ServiceInstance> serviceInstances = Iterables.filter(
+ Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
+
+ notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
+ }
+ }
+
+ private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
+ // ZK nodes may have changed if there was a session expiry for a server in the server set, but
+ // if the server's status has not changed, we can skip any onChange updates.
+ if (!currentServerSet.equals(serverSet)) {
+ if (currentServerSet.isEmpty()) {
+ LOG.warn("server set empty for path " + group.getPath());
+ } else {
+ if (serverSet == null) {
+ LOG.info("received initial membership {}", currentServerSet);
+ } else {
+ logChange(currentServerSet);
+ }
+ }
+ serverSet = currentServerSet;
+ monitor.onChange(serverSet);
+ }
+ }
+
+ private void logChange(ImmutableSet<ServiceInstance> newServerSet) {
+ StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
+ if (serverSet.size() != newServerSet.size()) {
+ message.append("from ").append(serverSet.size())
+ .append(" members to ").append(newServerSet.size());
+ }
+
+ Joiner joiner = Joiner.on("\n\t\t");
+
+ SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
+ if (!left.isEmpty()) {
+ message.append("\n\tleft:\n\t\t").append(joiner.join(left));
+ }
+
+ SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
+ if (!joined.isEmpty()) {
+ message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
+ }
+
+ LOG.info(message.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
new file mode 100644
index 0000000..01a54a5
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Common ServerSet related functions
+ */
+public class ServerSets {
+
+ private ServerSets() {
+ // Utility class.
+ }
+
+ /**
+ * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
+ */
+ public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
+ ServerSets::toEndpoint;
+
+ /**
+ * Creates a server set that registers at a single path applying the given ACL to all nodes
+ * created in the path.
+ *
+ * @param zkClient ZooKeeper client to register with.
+ * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
+ * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set)
+ * @return A server set that registers at {@code zkPath}.
+ */
+ public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
+ Preconditions.checkNotNull(zkClient);
+ MorePreconditions.checkNotBlank(acl);
+ MorePreconditions.checkNotBlank(zkPath);
+
+ return new ServerSetImpl(zkClient, acl, zkPath);
+ }
+
+ /**
+ * Returns a serialized Thrift service instance object, with given endpoints and codec.
+ *
+ * @param serviceInstance the Thrift service instance object to be serialized
+ * @param codec the codec to use to serialize a Thrift service instance object
+ * @return byte array that contains a serialized Thrift service instance
+ */
+ public static byte[] serializeServiceInstance(
+ ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ codec.serialize(serviceInstance, output);
+ return output.toByteArray();
+ }
+
+ /**
+ * Serializes a service instance based on endpoints.
+ * @see #serializeServiceInstance(ServiceInstance, Codec)
+ *
+ * @param address the target address of the service instance
+ * @param additionalEndpoints additional endpoints of the service instance
+ * @param status service status
+ */
+ public static byte[] serializeServiceInstance(
+ InetSocketAddress address,
+ Map<String, Endpoint> additionalEndpoints,
+ Status status,
+ Codec<ServiceInstance> codec) throws IOException {
+
+ ServiceInstance serviceInstance =
+ new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
+ return serializeServiceInstance(serviceInstance, codec);
+ }
+
+ /**
+ * Creates a service instance object deserialized from byte array.
+ *
+ * @param data the byte array contains a serialized Thrift service instance
+ * @param codec the codec to use to deserialize the byte array
+ */
+ public static ServiceInstance deserializeServiceInstance(
+ byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+ return codec.deserialize(new ByteArrayInputStream(data));
+ }
+
+ /**
+ * Creates an endpoint for the given InetSocketAddress.
+ *
+ * @param address the target address to create the endpoint for
+ */
+ public static Endpoint toEndpoint(InetSocketAddress address) {
+ return new Endpoint(address.getHostName(), address.getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
new file mode 100644
index 0000000..7f962eb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * A service that uses master election to only allow a single service instance to be active amongst
+ * a set of potential servers at a time.
+ */
+public interface SingletonService {
+
+ /**
+ * Indicates an error attempting to lead a group of servers.
+ */
+ class LeadException extends Exception {
+ public LeadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Indicates an error attempting to advertise leadership of a group of servers.
+ */
+ class AdvertiseException extends Exception {
+ public AdvertiseException(String message) {
+ super(message);
+ }
+
+ public AdvertiseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
+ */
+ class LeaveException extends Exception {
+ public LeaveException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Attempts to lead the singleton service.
+ *
+ * @param endpoint The primary endpoint to register as a leader candidate in the service.
+ * @param additionalEndpoints Additional endpoints that are available on the host.
+ * @param listener Handler to call when the candidate is elected or defeated.
+ * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
+ * @throws InterruptedException If the thread watching/joining the group was interrupted.
+ */
+ void lead(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ LeadershipListener listener)
+ throws LeadException, InterruptedException;
+
+ /**
+ * A listener to be notified of changes in the leadership status.
+ * Implementers should be careful to avoid blocking operations in these callbacks.
+ */
+ interface LeadershipListener {
+
+ /**
+ * Notifies the listener that is is current leader.
+ *
+ * @param control A controller handle to advertise and/or leave advertised presence.
+ */
+ void onLeading(LeaderControl control);
+
+ /**
+ * Notifies the listener that it is no longer leader.
+ */
+ void onDefeated();
+ }
+
+ /**
+ * A controller for the state of the leader. This will be provided to the leader upon election,
+ * which allows the leader to decide when to advertise as leader of the server set and terminate
+ * leadership at will.
+ */
+ interface LeaderControl {
+
+ /**
+ * Advertises the leader's server presence to clients.
+ *
+ * @throws AdvertiseException If there was an error advertising the singleton leader to clients
+ * of the server set.
+ * @throws InterruptedException If interrupted while advertising.
+ */
+ void advertise() throws AdvertiseException, InterruptedException;
+
+ /**
+ * Leaves candidacy for leadership, removing advertised server presence if applicable.
+ *
+ * @throws LeaveException If the leader's status could not be updated or there was an error
+ * abdicating server set leadership.
+ */
+ void leave() throws LeaveException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
new file mode 100644
index 0000000..d9978a9
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.zookeeper.data.ACL;
+
+public class SingletonServiceImpl implements SingletonService {
+ @VisibleForTesting
+ static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+ /**
+ * Creates a candidate that can be combined with an existing server set to form a singleton
+ * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
+ *
+ * @param zkClient The ZooKeeper client to use.
+ * @param servicePath The path where service nodes live.
+ * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+ * @return A candidate that can be housed with a standard server set under a single zk path.
+ */
+ public static Candidate createSingletonCandidate(
+ ZooKeeperClient zkClient,
+ String servicePath,
+ Iterable<ACL> acl) {
+
+ return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+ }
+
+ private final ServerSet serverSet;
+ private final Candidate candidate;
+
+ /**
+ * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
+ * advertises itself in the given server set once elected.
+ *
+ * @param serverSet The server set to advertise in on election.
+ * @param candidate The candidacy to use to vie for election.
+ */
+ public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
+ this.serverSet = Preconditions.checkNotNull(serverSet);
+ this.candidate = Preconditions.checkNotNull(candidate);
+ }
+
+ @Override
+ public void lead(final InetSocketAddress endpoint,
+ final Map<String, InetSocketAddress> additionalEndpoints,
+ final LeadershipListener listener)
+ throws LeadException, InterruptedException {
+
+ Preconditions.checkNotNull(listener);
+
+ try {
+ candidate.offerLeadership(new Leader() {
+ @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
+ listener.onLeading(new LeaderControl() {
+ ServerSet.EndpointStatus endpointStatus = null;
+ final AtomicBoolean left = new AtomicBoolean(false);
+
+ // Methods are synchronized to prevent simultaneous invocations.
+ @Override public synchronized void advertise()
+ throws AdvertiseException, InterruptedException {
+
+ Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
+ Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
+ try {
+ endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+ } catch (JoinException e) {
+ throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
+ }
+ }
+
+ @Override public synchronized void leave() throws LeaveException {
+ Preconditions.checkState(left.compareAndSet(false, true),
+ "Cannot leave more than once.");
+ if (endpointStatus != null) {
+ try {
+ endpointStatus.leave();
+ } catch (ServerSet.UpdateException e) {
+ throw new LeaveException("Problem updating endpoint status for abdicating leader " +
+ "at endpoint " + endpoint, e);
+ }
+ }
+ try {
+ abdicate.execute();
+ } catch (JoinException e) {
+ throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
+ }
+ }
+ });
+ }
+
+ @Override public void onDefeated() {
+ listener.onDefeated();
+ }
+ });
+ } catch (JoinException e) {
+ throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
+ } catch (Group.WatchException e) {
+ throw new LeadException("Problem getting initial membership list for leadership group.", e);
+ }
+ }
+}