You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/25 20:19:33 UTC
[19/37] aurora git commit: Import of Twitter Commons.
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java b/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
new file mode 100644
index 0000000..ea1c994
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
@@ -0,0 +1,52 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.logging;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.LogManager;
+
+/**
+ * A custom java.util.logging configuration class that loads the logging configuration from a
+ * properties file resource (as opposed to a file as natively supported by LogManager via
+ * java.util.logging.config.file). By default this configurator will look for the resource at
+ * /logging.properties but the resource path can be overridden by setting the system property with
+ * key {@link #LOGGING_PROPERTIES_RESOURCE_PATH java.util.logging.config.resource}. To install this
+ * configurator you must specify the following system property:
+ * java.util.logging.config.class=com.twitter.common.util.logging.ResourceLoggingConfigurator
+ *
+ * @author John Sirois
+ */
+public class ResourceLoggingConfigurator {
+
+ /**
+ * A system property that controls where ResourceLoggingConfigurator looks for the logging
+ * configuration on the process classpath.
+ */
+ public static final String LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.config.resource";
+
+ public ResourceLoggingConfigurator() throws IOException {
+ String loggingPropertiesResourcePath =
+ System.getProperty(LOGGING_PROPERTIES_RESOURCE_PATH, "/logging.properties");
+ InputStream loggingConfig = getClass().getResourceAsStream(loggingPropertiesResourcePath);
+ Preconditions.checkNotNull(loggingConfig,
+ "Could not locate logging config file at resource path: %s", loggingPropertiesResourcePath);
+ LogManager.getLogManager().readConfiguration(loggingConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java b/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
new file mode 100644
index 0000000..14822ff
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
@@ -0,0 +1,51 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.logging;
+
+import java.util.logging.LogManager;
+
+/**
+ * A LogManager which by default ignores calls to {@link #reset()}. This is useful to avoid missing
+ * log statements that occur during vm shutdown. The standard LogManager installs a
+ * {@link Runtime#addShutdownHook(Thread) shutdown hook} that disables logging and this subclass
+ * nullifies that shutdown hook by disabling any reset of the LogManager by default.
+ *
+ * @author John Sirois
+ */
+public class UnresettableLogManager extends LogManager {
+
+ /**
+ * The system property that controls which LogManager the java.util.logging subsystem should load.
+ */
+ public static final String LOGGING_MANAGER = "java.util.logging.manager";
+
+ /**
+ * A system property which can be used to control an {@code UnresettableLogManager}'s behavior.
+ * If the UnresettableLogManager is installed, but an application still wants
+ * {@link LogManager#reset()} behavior, they can set this property to "false".
+ */
+ private static final String LOGGING_MANAGER_IGNORERESET = "java.util.logging.manager.ignorereset";
+
+ @Override
+ public void reset() throws SecurityException {
+ if (Boolean.parseBoolean(System.getProperty(LOGGING_MANAGER_IGNORERESET, "true"))) {
+ System.err.println("UnresettableLogManager is ignoring a reset() request.");
+ } else {
+ super.reset();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
new file mode 100644
index 0000000..f677b1a
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
@@ -0,0 +1,83 @@
+package com.twitter.common.util.templating;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import com.google.common.base.Preconditions;
+
+import org.antlr.stringtemplate.AutoIndentWriter;
+import org.antlr.stringtemplate.StringTemplate;
+import org.antlr.stringtemplate.StringTemplateGroup;
+
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.MorePreconditions;
+
+/**
+ * A class to simplify the operations required to load a stringtemplate template file from the
+ * classpath and populate it.
+ */
+public class StringTemplateHelper {
+
+ private final StringTemplateGroup group;
+ private final String templatePath;
+
+ /**
+ * Creates a new template helper.
+ *
+ * @param templateContextClass Classpath context for the location of the template file.
+ * @param templateName Template file name (excluding .st suffix) relative to
+ * {@code templateContextClass}.
+ * @param cacheTemplates Whether the template should be cached.
+ */
+ public StringTemplateHelper(
+ Class<?> templateContextClass,
+ String templateName,
+ boolean cacheTemplates) {
+
+ MorePreconditions.checkNotBlank(templateName);
+ String templatePath =
+ templateContextClass.getPackage().getName().replace('.', '/') + "/" + templateName;
+ StringTemplateGroup group = new StringTemplateGroup(templateName);
+ Preconditions.checkNotNull(group.getInstanceOf(templatePath),
+ "Failed to load template at: %s", templatePath);
+
+ this.group = group;
+ if (!cacheTemplates) {
+ group.setRefreshInterval(0);
+ }
+ this.templatePath = templatePath;
+ }
+
+ /**
+ * Thrown when an exception is encountered while populating a template.
+ */
+ public static class TemplateException extends Exception {
+ public TemplateException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+
+ /**
+ * Writes the populated template to an output writer by providing a closure with access to
+ * the unpopulated template object.
+ *
+ * @param out Template output writer.
+ * @param parameterSetter Closure to populate the template.
+ * @throws TemplateException If an exception was encountered while populating the template.
+ */
+ public void writeTemplate(
+ Writer out,
+ Closure<StringTemplate> parameterSetter) throws TemplateException {
+
+ Preconditions.checkNotNull(out);
+ Preconditions.checkNotNull(parameterSetter);
+
+ StringTemplate stringTemplate = group.getInstanceOf(templatePath);
+ try {
+ parameterSetter.execute(stringTemplate);
+ stringTemplate.write(new AutoIndentWriter(out));
+ } catch (IOException e) {
+ throw new TemplateException("Failed to write template: " + e, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java b/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
new file mode 100644
index 0000000..d2eb318
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
@@ -0,0 +1,81 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.testing;
+
+import com.google.common.base.Preconditions;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A clock for use in testing with a configurable value for {@link #nowMillis()}.
+ *
+ * @author John Sirois
+ */
+public class FakeClock implements Clock {
+ // Tests may need to use the clock from multiple threads, ensure liveness.
+ private volatile long nowNanos;
+
+ /**
+ * Sets what {@link #nowMillis()} will return until this method is called again with a new value
+ * for {@code now}.
+ *
+ * @param nowMillis the current time in milliseconds
+ */
+ public void setNowMillis(long nowMillis) {
+ Preconditions.checkArgument(nowMillis >= 0);
+ this.nowNanos = TimeUnit.MILLISECONDS.toNanos(nowMillis);
+ }
+
+ /**
+ * Advances the current time by {@code millis} milliseconds. Time can be retarded by passing a
+ * negative value.
+ *
+ * @param period the amount of time to advance the current time by
+ */
+ public void advance(Amount<Long, Time> period) {
+ Preconditions.checkNotNull(period);
+ long newNanos = nowNanos + period.as(Time.NANOSECONDS);
+ Preconditions.checkArgument(newNanos >= 0,
+ "invalid period %s - would move current time to a negative value: %sns", period, newNanos);
+ nowNanos = newNanos;
+ }
+
+ @Override
+ public long nowMillis() {
+ return TimeUnit.NANOSECONDS.toMillis(nowNanos);
+ }
+
+ @Override
+ public long nowNanos() {
+ return nowNanos;
+ }
+
+ /**
+ * Waits in fake time, immediately returning in real time; however a check of {@link #nowMillis}
+ * after this method completes will consistently reveal that {@code millis} did in fact pass while
+ * waiting.
+ *
+ * @param millis the amount of time to wait in milliseconds
+ */
+ @Override
+ public void waitFor(long millis) {
+ advance(Amount.of(millis, Time.MILLISECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java b/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
new file mode 100644
index 0000000..fe25df8
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
@@ -0,0 +1,56 @@
+package com.twitter.common.util.testing;
+
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import org.omg.CORBA.PUBLIC_MEMBER;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * A ticker for use in testing with a configurable value for {@link #Ticker#read()}.
+ */
+public class FakeTicker extends Ticker{
+ private long nowNanos;
+
+ /**
+ * Sets what {@link #read()} will return until this method is called again with a new value
+ * for {@code now}.
+ *
+ * @param nowNanos the current time in nanoseconds
+ */
+ public void setNowNanos(long nowNanos) {
+ this.nowNanos = nowNanos;
+ }
+
+ @Override
+ public long read(){
+ return nowNanos;
+ }
+
+ /**
+ * Advances the current time by the given {@code period}. Time can be retarded by passing a
+ * negative value.
+ *
+ * @param period the amount of time to advance the current time by
+ */
+ public void advance(Amount<Long, Time> period) {
+ Preconditions.checkNotNull(period);
+ nowNanos = nowNanos + period.as(Time.NANOSECONDS);
+ }
+
+ /**
+ * Waits in fake time, immediately returning in real time; however a check of {@link #Ticker#read()}
+ * after this method completes will consistently reveal that {@code nanos} did in fact pass while
+ * waiting.
+ *
+ * @param nanos the amount of time to wait in nanoseconds
+ */
+ public void waitNanos(long nanos) {
+ advance(Amount.of(nanos, Time.NANOSECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java b/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
new file mode 100644
index 0000000..2b6eb21
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
@@ -0,0 +1,93 @@
+// =================================================================================================
+// Copyright 2012 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.webassets.bootstrap;
+
+import com.google.common.io.Resources;
+import com.google.common.net.MediaType;
+import com.google.inject.AbstractModule;
+
+import com.twitter.common.application.http.Registration;
+
+/**
+ * A binding module to register bootstrap HTTP assets.
+ */
+public final class BootstrapModule extends AbstractModule {
+ /**
+ * Enum for available Bootstrap versions to choose from.
+ */
+ public enum BootstrapVersion {
+ VERSION_2_1_1 ("2.1.1"),
+ VERSION_2_3_2 ("2.3.2");
+
+ private final String version;
+
+ BootstrapVersion(String s) {
+ version = s;
+ }
+ }
+
+ private final String version;
+
+ /**
+ * Default constructor.
+ */
+ public BootstrapModule() {
+ this(BootstrapVersion.VERSION_2_1_1);
+ }
+
+ /**
+ * BootstrapModule Constructor.
+ *
+ * @param version supplies the bootstrap version to select.
+ */
+ public BootstrapModule(BootstrapVersion version) {
+ this.version = version.version;
+ }
+
+ private void register(String mountPath, String resourcePath, String contentType) {
+ Registration.registerHttpAsset(
+ binder(),
+ "/" + mountPath,
+ Resources.getResource(BootstrapModule.class, resourcePath),
+ contentType,
+ true);
+ }
+
+ @Override
+ protected void configure() {
+ register(
+ "css/bootstrap-responsive.min.css",
+ version + "/css/bootstrap-responsive.min.css",
+ MediaType.CSS_UTF_8.toString());
+ register(
+ "css/bootstrap.min.css",
+ version + "/css/bootstrap.min.css",
+ MediaType.CSS_UTF_8.toString());
+ register(
+ "img/glyphicons-halflings-white.png",
+ version + "/img/glyphicons-halflings-white.png",
+ MediaType.PNG.toString());
+ register(
+ "img/glyphicons-halflings.png",
+ version + "/img/glyphicons-halflings.png",
+ MediaType.PNG.toString());
+ register(
+ "js/bootstrap.min.js",
+ version + "/js/bootstrap.min.js",
+ MediaType.JAVASCRIPT_UTF_8.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java b/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
new file mode 100644
index 0000000..316b328
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
@@ -0,0 +1,39 @@
+// =================================================================================================
+// Copyright 2012 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.webassets.jquery;
+
+import com.google.common.io.Resources;
+import com.google.common.net.MediaType;
+import com.google.inject.AbstractModule;
+
+import com.twitter.common.application.http.Registration;
+
+/**
+ * A binding module to register jQuery HTTP assets.
+ */
+public final class JQueryModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ Registration.registerHttpAsset(
+ binder(),
+ "/js/jquery.min.js",
+ Resources.getResource(JQueryModule.class, "js/jquery-1.8.2.min.js"),
+ MediaType.JAVASCRIPT_UTF_8.toString(),
+ true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java b/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
new file mode 100644
index 0000000..bc9ec63
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
@@ -0,0 +1,82 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import org.apache.zookeeper.KeeperException;
+
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.WatchException;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * Interface definition for becoming or querying for a ZooKeeper-based group leader.
+ */
+public interface Candidate {
+
+ /**
+ * Returns the current group leader by querying ZooKeeper synchronously.
+ *
+ * @return the current group leader's identifying data or {@link Optional#absent()} if there is
+ * no leader
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading the leader information
+ * @throws InterruptedException if this thread is interrupted getting the leader
+ */
+ public Optional<byte[]> getLeaderData()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException;
+
+ /**
+ * Encapsulates a leader that can be elected and subsequently defeated.
+ */
+ interface Leader {
+
+ /**
+ * Called when this leader has been elected.
+ *
+ * @param abdicate a command that can be used to abdicate leadership and force a new election
+ */
+ void onElected(ExceptionalCommand<JoinException> abdicate);
+
+ /**
+ * Called when the leader has been ousted. Can occur either if the leader abdicates or if an
+ * external event causes the leader to lose its leadership role (session expiration).
+ */
+ void onDefeated();
+ }
+
+ /**
+ * Offers this candidate in leadership elections for as long as the current jvm process is alive.
+ * Upon election, the {@code onElected} callback will be executed and a command that can be used
+ * to abdicate leadership will be passed in. If the elected leader jvm process dies or the
+ * elected leader successfully abdicates then a new leader will be elected. Leaders that
+ * successfully abdicate are removed from the group and will not be eligible for leadership
+ * election unless {@link #offerLeadership(Leader)} is called again.
+ *
+ * @param leader the leader to notify of election and defeat events
+ * @throws JoinException if there was a problem joining the group
+ * @throws WatchException if there is a problem generating the 1st group membership list
+ * @throws InterruptedException if interrupted waiting to join the group and determine initial
+ * election results
+ * @return a supplier that can be queried to find out if this leader is currently elected
+ */
+ public Supplier<Boolean> offerLeadership(Leader leader)
+ throws JoinException, WatchException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
new file mode 100644
index 0000000..3361a7f
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
@@ -0,0 +1,184 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.apache.zookeeper.KeeperException;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.zookeeper.Group.GroupChangeListener;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.Membership;
+import com.twitter.common.zookeeper.Group.WatchException;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * Implements leader election for small groups of candidates. This implementation is subject to the
+ * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
+ * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
+ */
+public class CandidateImpl implements Candidate {
+ private static final Logger LOG = Logger.getLogger(CandidateImpl.class.getName());
+
+ private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
+
+ private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = new Supplier<byte[]>() {
+ @Override public byte[] get() {
+ try {
+ return InetAddress.getLocalHost().getHostAddress().getBytes();
+ } catch (UnknownHostException e) {
+ LOG.log(Level.WARNING, "Failed to determine local address!", e);
+ return UNKNOWN_CANDIDATE_DATA;
+ }
+ }
+ };
+
+ private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
+ new Function<Iterable<String>, String>() {
+ @Override public String apply(Iterable<String> candidates) {
+ return Ordering.natural().min(candidates);
+ }
+ };
+
+ private final Group group;
+ private final Function<Iterable<String>, String> judge;
+ private final Supplier<byte[]> dataSupplier;
+
+ /**
+ * Equivalent to {@link #CandidateImpl(Group, com.google.common.base.Function, Supplier)} using a
+ * judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest or
+ * 1st candidate and a default supplier that provides the ip address of this host according to
+ * {@link java.net.InetAddress#getLocalHost()} as the leader identifying data.
+ */
+ public CandidateImpl(Group group) {
+ this(group, MOST_RECENT_JUDGE, IP_ADDRESS_DATA_SUPPLIER);
+ }
+
+ /**
+ * Creates a candidate that can be used to offer leadership for the given {@code group} using
+ * a judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest
+ * or 1st. The dataSupplier should produce bytes that identify this process as leader. These bytes
+ * will become available to all participants via the {@link Candidate#getLeaderData()} method.
+ */
+ public CandidateImpl(Group group, Supplier<byte[]> dataSupplier) {
+ this(group, MOST_RECENT_JUDGE, dataSupplier);
+ }
+
+ /**
+ * Creates a candidate that can be used to offer leadership for the given {@code group}. The
+ * {@code judge} is used to pick the current leader from all group members whenever the group
+ * membership changes. To form a well-behaved election group with one leader, all candidates
+ * should use the same judge. The dataSupplier should produce bytes that identify this process
+ * as leader. These bytes will become available to all participants via the
+ * {@link Candidate#getLeaderData()} method.
+ */
+ public CandidateImpl(
+ Group group,
+ Function<Iterable<String>, String> judge,
+ Supplier<byte[]> dataSupplier) {
+ this.group = Preconditions.checkNotNull(group);
+ this.judge = Preconditions.checkNotNull(judge);
+ this.dataSupplier = Preconditions.checkNotNull(dataSupplier);
+ }
+
+ @Override
+ public Optional<byte[]> getLeaderData()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+
+ String leaderId = getLeader(group.getMemberIds());
+ return leaderId == null
+ ? Optional.<byte[]>absent()
+ : Optional.of(group.getMemberData(leaderId));
+ }
+
+ @Override
+ public Supplier<Boolean> offerLeadership(final Leader leader)
+ throws JoinException, WatchException, InterruptedException {
+
+ final Membership membership = group.join(dataSupplier, new Command() {
+ @Override public void execute() {
+ leader.onDefeated();
+ }
+ });
+
+ final AtomicBoolean elected = new AtomicBoolean(false);
+ final AtomicBoolean abdicated = new AtomicBoolean(false);
+ group.watch(new GroupChangeListener() {
+ @Override public void onGroupChange(Iterable<String> memberIds) {
+ boolean noCandidates = Iterables.isEmpty(memberIds);
+ String memberId = membership.getMemberId();
+
+ if (noCandidates) {
+ LOG.warning("All candidates have temporarily left the group: " + group);
+ } else if (!Iterables.contains(memberIds, memberId)) {
+ LOG.severe(String.format(
+ "Current member ID %s is not a candidate for leader, current voting: %s",
+ memberId, memberIds));
+ } else {
+ boolean electedLeader = memberId.equals(getLeader(memberIds));
+ boolean previouslyElected = elected.getAndSet(electedLeader);
+
+ if (!previouslyElected && electedLeader) {
+ LOG.info(String.format("Candidate %s is now leader of group: %s",
+ membership.getMemberPath(), memberIds));
+
+ leader.onElected(new ExceptionalCommand<JoinException>() {
+ @Override public void execute() throws JoinException {
+ membership.cancel();
+ abdicated.set(true);
+ }
+ });
+ } else if (!electedLeader) {
+ if (previouslyElected) {
+ leader.onDefeated();
+ }
+ LOG.info(String.format(
+ "Candidate %s waiting for the next leader election, current voting: %s",
+ membership.getMemberPath(), memberIds));
+ }
+ }
+ }
+ });
+
+ return new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ return !abdicated.get() && elected.get();
+ }
+ };
+ }
+
+ @Nullable
+ private String getLeader(Iterable<String> memberIds) {
+ return Iterables.isEmpty(memberIds) ? null : judge.apply(memberIds);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
new file mode 100644
index 0000000..2e7260b
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
@@ -0,0 +1,211 @@
+package com.twitter.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Commands;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+/**
+ * A ServerSet that delegates all calls to other ServerSets.
+ */
+public class CompoundServerSet implements ServerSet {
+ private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n');
+
+ private final List<ServerSet> serverSets;
+ private final Map<ServerSet, ImmutableSet<ServiceInstance>> instanceCache = Maps.newHashMap();
+ private final List<HostChangeMonitor<ServiceInstance>> monitors = Lists.newArrayList();
+ private Command stopWatching = null;
+ private ImmutableSet<ServiceInstance> allHosts = ImmutableSet.of();
+
+ /**
+ * Create new ServerSet from a list of serverSets.
+ *
+ * @param serverSets serverSets to which the calls will be delegated.
+ */
+ public CompoundServerSet(Iterable<ServerSet> serverSets) {
+ MorePreconditions.checkNotBlank(serverSets);
+ this.serverSets = ImmutableList.copyOf(serverSets);
+ }
+
+ private interface JoinOp {
+ EndpointStatus doJoin(ServerSet serverSet) throws JoinException, InterruptedException;
+ }
+
+ private interface StatusOp {
+ void changeStatus(EndpointStatus status) throws UpdateException;
+ }
+
+ private void changeStatus(
+ ImmutableList<EndpointStatus> statuses,
+ StatusOp statusOp) throws UpdateException {
+
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ int errorIdx = 1;
+ for (EndpointStatus endpointStatus : statuses) {
+ try {
+ statusOp.changeStatus(endpointStatus);
+ } catch (UpdateException exception) {
+ builder.add(String.format("[%d] %s", errorIdx++,
+ Throwables.getStackTraceAsString(exception)));
+ }
+ }
+ if (errorIdx > 1) {
+ throw new UpdateException(
+ "One or more ServerSet update failed: " + STACK_TRACE_JOINER.join(builder.build()));
+ }
+ }
+
+ private EndpointStatus doJoin(JoinOp joiner) throws JoinException, InterruptedException {
+ // Get the list of endpoint status from the serverSets.
+ ImmutableList.Builder<EndpointStatus> builder = ImmutableList.builder();
+ for (ServerSet serverSet : serverSets) {
+ builder.add(joiner.doJoin(serverSet));
+ }
+
+ final ImmutableList<EndpointStatus> statuses = builder.build();
+
+ return new EndpointStatus() {
+ @Override public void leave() throws UpdateException {
+ changeStatus(statuses, new StatusOp() {
+ @Override public void changeStatus(EndpointStatus status) throws UpdateException {
+ status.leave();
+ }
+ });
+ }
+
+ @Override public void update(final Status newStatus) throws UpdateException {
+ changeStatus(statuses, new StatusOp() {
+ @Override public void changeStatus(EndpointStatus status) throws UpdateException {
+ status.update(newStatus);
+ }
+ });
+ }
+ };
+ }
+
+ @Override
+ public EndpointStatus join(
+ final InetSocketAddress endpoint,
+ final Map<String, InetSocketAddress> additionalEndpoints)
+ throws Group.JoinException, InterruptedException {
+
+ return doJoin(new JoinOp() {
+ @Override public EndpointStatus doJoin(ServerSet serverSet)
+ throws JoinException, InterruptedException {
+ return serverSet.join(endpoint, additionalEndpoints);
+ }
+ });
+ }
+
+ /*
+ * If any one of the serverSet throws an exception during respective join, the exception is
+ * propagated. Join is successful only if all the joins are successful.
+ *
+ * NOTE: If an exception occurs during the join, the serverSets in the composite can be in a
+ * partially joined state.
+ *
+ * @see ServerSet#join(InetSocketAddress, Map, Status)
+ */
+ @Override
+ public EndpointStatus join(
+ final InetSocketAddress endpoint,
+ final Map<String, InetSocketAddress> additionalEndpoints,
+ final Status status) throws Group.JoinException, InterruptedException {
+
+ return doJoin(new JoinOp() {
+ @Override public EndpointStatus doJoin(ServerSet serverSet)
+ throws JoinException, InterruptedException {
+
+ return serverSet.join(endpoint, additionalEndpoints, status);
+ }
+ });
+ }
+
+ @Override
+ public EndpointStatus join(
+ final InetSocketAddress endpoint,
+ final Map<String, InetSocketAddress> additionalEndpoints,
+ final int shardId) throws JoinException, InterruptedException {
+
+ return doJoin(new JoinOp() {
+ @Override public EndpointStatus doJoin(ServerSet serverSet)
+ throws JoinException, InterruptedException {
+
+ return serverSet.join(endpoint, additionalEndpoints, shardId);
+ }
+ });
+ }
+
+ // Handles changes to the union of hosts.
+ private synchronized void handleChange(ServerSet serverSet, ImmutableSet<ServiceInstance> hosts) {
+ instanceCache.put(serverSet, hosts);
+
+ // Get the union of hosts.
+ ImmutableSet<ServiceInstance> currentHosts =
+ ImmutableSet.copyOf(Iterables.concat(instanceCache.values()));
+
+ // Check if the hosts have changed.
+ if (!currentHosts.equals(allHosts)) {
+ allHosts = currentHosts;
+
+ // Notify the monitors.
+ for (HostChangeMonitor<ServiceInstance> monitor : monitors) {
+ monitor.onChange(allHosts);
+ }
+ }
+ }
+
+ /**
+ * Monitor the CompoundServerSet.
+ *
+ * If any one of the monitor calls to the underlying serverSet raises a MonitorException, the
+ * exception is propagated. The call is successful only if all the monitor calls to the
+ * underlying serverSets are successful.
+ *
+ * NOTE: If an exception occurs during the monitor call, the serverSets in the composite will not
+ * be monitored.
+ *
+ * @param monitor HostChangeMonitor instance used to monitor host changes.
+ * @return A command that, when executed, will stop monitoring all underlying server sets.
+ * @throws MonitorException If there was a problem monitoring any of the underlying server sets.
+ */
+ @Override
+ public synchronized Command watch(HostChangeMonitor<ServiceInstance> monitor)
+ throws MonitorException {
+ if (stopWatching == null) {
+ monitors.add(monitor);
+ ImmutableList.Builder<Command> commandsBuilder = ImmutableList.builder();
+
+ for (final ServerSet serverSet : serverSets) {
+ commandsBuilder.add(serverSet.watch(new HostChangeMonitor<ServiceInstance>() {
+ @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) {
+ handleChange(serverSet, hostSet);
+ }
+ }));
+ }
+
+ stopWatching = Commands.compound(commandsBuilder.build());
+ }
+
+ return stopWatching;
+ }
+
+ @Override
+ public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+ watch(monitor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
new file mode 100644
index 0000000..fdcd8d9
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
@@ -0,0 +1,42 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DistributedLock
+ *
+ * @author Florian Leibert
+ */
+public interface DistributedLock {
+ void lock() throws LockingException;
+
+ boolean tryLock(long timeout, TimeUnit unit);
+
+ void unlock() throws LockingException;
+
+ public static class LockingException extends RuntimeException {
+ public LockingException(String msg, Exception e) {
+ super(msg, e);
+ }
+
+ public LockingException(String msg) {
+ super(msg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
new file mode 100644
index 0000000..7669f92
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
@@ -0,0 +1,289 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import com.twitter.common.base.MorePreconditions;
+
+/**
+ * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock,
+ * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a
+ * list of children for the lock node. Due to the nature of sequential, all the ids are increasing
+ * in order, therefore the client with the least ID according to natural ordering will hold the
+ * lock. Every other client watches the id immediately preceding its own id and checks for the lock
+ * in case of notification. The client holding the lock does the work and finally deletes the node,
+ * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but
+ * avoided in most cases because if a client drops dead while holding the lock, the ZK session
+ * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks
+ * could occur if the the worker thread on a client hangs but the zk-client thread is still alive.
+ * There could be an external monitor client that ensures that alerts are triggered if the least-id
+ * ephemeral node is present past a time-out.
+ * <p/>
+ * Note: Locking attempts will fail in case session expires!
+ *
+ * @author Florian Leibert
+ */
+@ThreadSafe
+public class DistributedLockImpl implements DistributedLock {
+
+ private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName());
+
+ private final ZooKeeperClient zkClient;
+ private final String lockPath;
+ private final ImmutableList<ACL> acl;
+
+ private final AtomicBoolean aborted = new AtomicBoolean(false);
+ private CountDownLatch syncPoint;
+ private boolean holdsLock = false;
+ private String currentId;
+ private String currentNode;
+ private String watchedNode;
+ private LockWatcher watcher;
+
+ /**
+ * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default
+ * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
+ */
+ public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) {
+ this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ /**
+ * Creates a distributed lock using the given {@code zkClient} to coordinate locking.
+ *
+ * @param zkClient The ZooKeeper client to use.
+ * @param lockPath The path used to manage the lock under.
+ * @param acl The acl to apply to newly created lock nodes.
+ */
+ public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) {
+ this.zkClient = Preconditions.checkNotNull(zkClient);
+ this.lockPath = MorePreconditions.checkNotBlank(lockPath);
+ this.acl = ImmutableList.copyOf(acl);
+ this.syncPoint = new CountDownLatch(1);
+ }
+
+ private synchronized void prepare()
+ throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+ ZooKeeperUtils.ensurePath(zkClient, acl, lockPath);
+ LOG.log(Level.FINE, "Working with locking path:" + lockPath);
+
+ // Create an EPHEMERAL_SEQUENTIAL node.
+ currentNode =
+ zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+ // We only care about our actual id since we want to compare ourselves to siblings.
+ if (currentNode.contains("/")) {
+ currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1);
+ }
+ LOG.log(Level.FINE, "Received ID from zk:" + currentId);
+ this.watcher = new LockWatcher();
+ }
+
+ @Override
+ public synchronized void lock() throws LockingException {
+ if (holdsLock) {
+ throw new LockingException("Error, already holding a lock. Call unlock first!");
+ }
+ try {
+ prepare();
+ watcher.checkForLock();
+ syncPoint.await();
+ if (!holdsLock) {
+ throw new LockingException("Error, couldn't acquire the lock!");
+ }
+ } catch (InterruptedException e) {
+ cancelAttempt();
+ throw new LockingException("InterruptedException while trying to acquire lock!", e);
+ } catch (KeeperException e) {
+ // No need to clean up since the node wasn't created yet.
+ throw new LockingException("KeeperException while trying to acquire lock!", e);
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ // No need to clean up since the node wasn't created yet.
+ throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
+ }
+ }
+
+ @Override
+ public synchronized boolean tryLock(long timeout, TimeUnit unit) {
+ if (holdsLock) {
+ throw new LockingException("Error, already holding a lock. Call unlock first!");
+ }
+ try {
+ prepare();
+ watcher.checkForLock();
+ boolean success = syncPoint.await(timeout, unit);
+ if (!success) {
+ return false;
+ }
+ if (!holdsLock) {
+ throw new LockingException("Error, couldn't acquire the lock!");
+ }
+ } catch (InterruptedException e) {
+ cancelAttempt();
+ return false;
+ } catch (KeeperException e) {
+ // No need to clean up since the node wasn't created yet.
+ throw new LockingException("KeeperException while trying to acquire lock!", e);
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ // No need to clean up since the node wasn't created yet.
+ throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void unlock() throws LockingException {
+ if (currentId == null) {
+ throw new LockingException("Error, neither attempting to lock nor holding a lock!");
+ }
+ Preconditions.checkNotNull(currentId);
+ // Try aborting!
+ if (!holdsLock) {
+ aborted.set(true);
+ LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!");
+ } else {
+ LOG.log(Level.INFO, "Cleaning up this locks ephemeral node.");
+ cleanup();
+ }
+ }
+
+ //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token?
+
+ private synchronized void cancelAttempt() {
+ LOG.log(Level.INFO, "Cancelling lock attempt!");
+ cleanup();
+ // Bubble up failure...
+ holdsLock = false;
+ syncPoint.countDown();
+ }
+
+ private void cleanup() {
+ LOG.info("Cleaning up!");
+ Preconditions.checkNotNull(currentId);
+ try {
+ Stat stat = zkClient.get().exists(currentNode, false);
+ if (stat != null) {
+ zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION);
+ } else {
+ LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ holdsLock = false;
+ aborted.set(false);
+ currentId = null;
+ currentNode = null;
+ watcher = null;
+ syncPoint = new CountDownLatch(1);
+ }
+
+ class LockWatcher implements Watcher {
+
+ public synchronized void checkForLock() {
+ MorePreconditions.checkNotBlank(currentId);
+
+ try {
+ List<String> candidates = zkClient.get().getChildren(lockPath, null);
+ ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates);
+
+ // Unexpected behavior if there are no children!
+ if (sortedMembers.isEmpty()) {
+ throw new LockingException("Error, member list is empty!");
+ }
+
+ int memberIndex = sortedMembers.indexOf(currentId);
+
+ // If we hold the lock
+ if (memberIndex == 0) {
+ holdsLock = true;
+ syncPoint.countDown();
+ } else {
+ final String nextLowestNode = sortedMembers.get(memberIndex - 1);
+ LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " +
+ "waiting for [%s] to release lock.", currentId, nextLowestNode));
+
+ watchedNode = String.format("%s/%s", lockPath, nextLowestNode);
+ Stat stat = zkClient.get().exists(watchedNode, this);
+ if (stat == null) {
+ checkForLock();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+ "got interrupted. Trying to cancel lock acquisition.", currentId), e);
+ cancelAttempt();
+ } catch (KeeperException e) {
+ LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+ "got a KeeperException. Trying to cancel lock acquisition.", currentId), e);
+ cancelAttempt();
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+ "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e);
+ cancelAttempt();
+ }
+ }
+
+ @Override
+ public synchronized void process(WatchedEvent event) {
+ // this handles the case where we have aborted a lock and deleted ourselves but still have a
+ // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub.
+ if (!event.getPath().equals(watchedNode)) {
+ LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode);
+ return;
+ }
+ //TODO(Florian Leibert): Pull this into the outer class.
+ if (event.getType() == Watcher.Event.EventType.None) {
+ switch (event.getState()) {
+ case SyncConnected:
+ // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort.
+ LOG.info("Reconnected...");
+ break;
+ case Expired:
+ LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId));
+ cancelAttempt();
+ break;
+ }
+ } else if (event.getType() == Event.EventType.NodeDeleted) {
+ checkForLock();
+ } else {
+ LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Group.java b/commons/src/main/java/com/twitter/common/zookeeper/Group.java
new file mode 100644
index 0000000..81c451c
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/Group.java
@@ -0,0 +1,711 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Commands;
+import com.twitter.common.base.ExceptionalSupplier;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.util.BackoffHelper;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * This class exposes methods for joining and monitoring distributed groups. The groups this class
+ * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
+ * each member of a group.
+ */
+public class Group {
+ private static final Logger LOG = Logger.getLogger(Group.class.getName());
+
+ private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
+ private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
+
+ private final ZooKeeperClient zkClient;
+ private final ImmutableList<ACL> acl;
+ private final String path;
+
+ private final NodeScheme nodeScheme;
+ private final Predicate<String> nodeNameFilter;
+
+ private final BackoffHelper backoffHelper;
+
+ /**
+ * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or
+ * duplicate slashes will be normalized. For example, all the following paths would create a
+ * group at the normalized path /my/distributed/group:
+ * <ul>
+ * <li>/my/distributed/group
+ * <li>/my/distributed/group/
+ * <li>/my/distributed//group
+ * </ul>
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param acl the ACL to use for creating the persistent group path if it does not already exist
+ * @param path the absolute persistent path that represents this group
+ * @param nodeScheme the scheme that defines how nodes are created
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
+ this.zkClient = Preconditions.checkNotNull(zkClient);
+ this.acl = ImmutableList.copyOf(acl);
+ this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
+
+ this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
+ nodeNameFilter = new Predicate<String>() {
+ @Override public boolean apply(String nodeName) {
+ return Group.this.nodeScheme.isMember(nodeName);
+ }
+ };
+
+ backoffHelper = new BackoffHelper();
+ }
+
+ /**
+ * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
+ * {@code namePrefix} of 'member_'.
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+ this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
+ }
+
+ /**
+ * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
+ * {@link DefaultScheme} using {@code namePrefix}.
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
+ this(zkClient, acl, path, new DefaultScheme(namePrefix));
+ }
+
+ public String getMemberPath(String memberId) {
+ return path + "/" + MorePreconditions.checkNotBlank(memberId);
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getMemberId(String nodePath) {
+ MorePreconditions.checkNotBlank(nodePath);
+ Preconditions.checkArgument(nodePath.startsWith(path + "/"),
+ "Not a member of this group[%s]: %s", path, nodePath);
+
+ String memberId = StringUtils.substringAfterLast(nodePath, "/");
+ Preconditions.checkArgument(nodeScheme.isMember(memberId),
+ "Not a group member: %s", memberId);
+ return memberId;
+ }
+
+ /**
+ * Returns the current list of group member ids by querying ZooKeeper synchronously.
+ *
+ * @return the ids of all the present members of this group
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading this group's member ids
+ * @throws InterruptedException if this thread is interrupted listing the group members
+ */
+ public Iterable<String> getMemberIds()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+ return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
+ }
+
+ /**
+ * Gets the data for one of this groups members by querying ZooKeeper synchronously.
+ *
+ * @param memberId the id of the member whose data to retrieve
+ * @return the data associated with the {@code memberId}
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading this member's data
+ * @throws InterruptedException if this thread is interrupted retrieving the member data
+ */
+ public byte[] getMemberData(String memberId)
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+ return zkClient.get().getData(getMemberPath(memberId), false, null);
+ }
+
+ /**
+ * Represents membership in a distributed group.
+ */
+ public interface Membership {
+
+ /**
+ * Returns the persistent ZooKeeper path that represents this group.
+ */
+ String getGroupPath();
+
+ /**
+ * Returns the id (ZooKeeper node name) of this group member. May change over time if the
+ * ZooKeeper session expires.
+ */
+ String getMemberId();
+
+ /**
+ * Returns the full ZooKeeper path to this group member. May change over time if the
+ * ZooKeeper session expires.
+ */
+ String getMemberPath();
+
+ /**
+ * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
+ * {@link Group#join()}.
+ *
+ * @return the new membership data
+ * @throws UpdateException if there was a problem updating the membership data
+ */
+ byte[] updateMemberData() throws UpdateException;
+
+ /**
+ * Cancels group membership by deleting the associated ZooKeeper member node.
+ *
+ * @throws JoinException if there is a problem deleting the node
+ */
+ void cancel() throws JoinException;
+ }
+
+ /**
+ * Indicates an error joining a group.
+ */
+ public static class JoinException extends Exception {
+ public JoinException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Indicates an error updating a group member's data.
+ */
+ public static class UpdateException extends Exception {
+ public UpdateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Equivalent to calling {@code join(null, null)}.
+ */
+ public final Membership join() throws JoinException, InterruptedException {
+ return join(NO_MEMBER_DATA, null);
+ }
+
+ /**
+ * Equivalent to calling {@code join(memberData, null)}.
+ */
+ public final Membership join(Supplier<byte[]> memberData)
+ throws JoinException, InterruptedException {
+
+ return join(memberData, null);
+ }
+
+ /**
+ * Equivalent to calling {@code join(null, onLoseMembership)}.
+ */
+ public final Membership join(@Nullable final Command onLoseMembership)
+ throws JoinException, InterruptedException {
+
+ return join(NO_MEMBER_DATA, onLoseMembership);
+ }
+
+ /**
+ * Joins this group and returns the resulting Membership when successful. Membership will be
+ * automatically cancelled when the current jvm process dies; however the returned Membership
+ * object can be used to cancel membership earlier. Unless
+ * {@link com.twitter.common.zookeeper.Group.Membership#cancel()} is called the membership will
+ * be maintained by re-establishing it silently in the background.
+ *
+ * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an
+ * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
+ * membership in the group.
+ *
+ * @param memberData a supplier of the data to store in the member node
+ * @param onLoseMembership a callback to notify when membership is lost
+ * @return a Membership object with the member details
+ * @throws JoinException if there was a problem joining the group
+ * @throws InterruptedException if this thread is interrupted awaiting completion of the join
+ */
+ public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
+ throws JoinException, InterruptedException {
+
+ Preconditions.checkNotNull(memberData);
+ ensurePersistentGroupPath();
+
+ final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
+ return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, JoinException>() {
+ @Override public Membership get() throws JoinException {
+ try {
+ return groupJoiner.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to join group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e);
+ return null;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e);
+ return null;
+ } else {
+ throw new JoinException("Problem joining partition group at path: " + path, e);
+ }
+ }
+ }
+ });
+ }
+
+ private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
+ backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() {
+ @Override public Boolean get() throws JoinException {
+ try {
+ ZooKeeperUtils.ensurePath(zkClient, acl, path);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, e);
+ return false;
+ } else {
+ throw new JoinException("Problem ensuring group at path: " + path, e);
+ }
+ }
+ }
+ });
+ }
+
+ private class ActiveMembership implements Membership {
+ private final Supplier<byte[]> memberData;
+ private final Command onLoseMembership;
+ private String nodePath;
+ private String memberId;
+ private volatile boolean cancelled;
+ private byte[] membershipData;
+
+ public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
+ this.memberData = memberData;
+ this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
+ }
+
+ @Override
+ public String getGroupPath() {
+ return path;
+ }
+
+ @Override
+ public synchronized String getMemberId() {
+ return memberId;
+ }
+
+ @Override
+ public synchronized String getMemberPath() {
+ return nodePath;
+ }
+
+ @Override
+ public synchronized byte[] updateMemberData() throws UpdateException {
+ byte[] membershipData = memberData.get();
+ if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
+ try {
+ zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
+ this.membershipData = membershipData;
+ } catch (KeeperException e) {
+ throw new UpdateException("Problem updating membership data.", e);
+ } catch (InterruptedException e) {
+ throw new UpdateException("Interrupted attempting to update membership data.", e);
+ } catch (ZooKeeperConnectionException e) {
+ throw new UpdateException(
+ "Could not connect to the ZooKeeper cluster to update membership data.", e);
+ }
+ }
+ return membershipData;
+ }
+
+ @Override
+ public synchronized void cancel() throws JoinException {
+ if (!cancelled) {
+ try {
+ backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() {
+ @Override public Boolean get() throws JoinException {
+ try {
+ zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (NoNodeException e) {
+ LOG.info("Membership already cancelled, node at path: " + nodePath +
+ " has been deleted");
+ return true;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.log(Level.WARNING, "Temporary error cancelling membership: " + nodePath, e);
+ return false;
+ } else {
+ throw new JoinException("Problem cancelling membership: " + nodePath, e);
+ }
+ }
+ }
+ });
+ cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Problem cancelling membership: " + nodePath, e);
+ }
+ }
+ }
+
+ private class CancelledException extends IllegalStateException { /* marker */ }
+
+ synchronized Membership join()
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+ if (cancelled) {
+ throw new CancelledException();
+ }
+
+ if (nodePath == null) {
+ // Re-join if our ephemeral node goes away due to session expiry - only needs to be
+ // registered once.
+ zkClient.registerExpirationHandler(new Command() {
+ @Override public void execute() {
+ tryJoin();
+ }
+ });
+ }
+
+ byte[] membershipData = memberData.get();
+ String nodeName = nodeScheme.createName(membershipData);
+ CreateMode createMode = nodeScheme.isSequential()
+ ? CreateMode.EPHEMERAL_SEQUENTIAL
+ : CreateMode.EPHEMERAL;
+ nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
+ memberId = Group.this.getMemberId(nodePath);
+ LOG.info("Set group member ID to " + memberId);
+ this.membershipData = membershipData;
+
+ // Re-join if our ephemeral node goes away due to maliciousness.
+ zkClient.get().exists(nodePath, new Watcher() {
+ @Override public void process(WatchedEvent event) {
+ if (event.getType() == EventType.NodeDeleted) {
+ tryJoin();
+ }
+ }
+ });
+
+ return this;
+ }
+
+ private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
+ new ExceptionalSupplier<Boolean, InterruptedException>() {
+ @Override public Boolean get() throws InterruptedException {
+ try {
+ join();
+ return true;
+ } catch (CancelledException e) {
+ // Lost a cancel race - that's ok.
+ return true;
+ } catch (ZooKeeperConnectionException e) {
+ LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.log(Level.WARNING, "Temporary error re-joining group: " + path, e);
+ return false;
+ } else {
+ throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
+ }
+ }
+ }
+ };
+
+ private synchronized void tryJoin() {
+ onLoseMembership.execute();
+ try {
+ backoffHelper.doUntilSuccess(tryJoin);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
+ }
+ }
+ }
+
+ /**
+ * An interface to an object that listens for changes to a group's membership.
+ */
+ public interface GroupChangeListener {
+
+ /**
+ * Called whenever group membership changes with the new list of member ids.
+ *
+ * @param memberIds the current member ids
+ */
+ void onGroupChange(Iterable<String> memberIds);
+ }
+
+ /**
+ * An interface that dictates the scheme to use for storing and filtering nodes that represent
+ * members of a distributed group.
+ */
+ public interface NodeScheme {
+ /**
+ * Determines if a child node is a member of a group by examining the node's name.
+ *
+ * @param nodeName the name of a child node found in a group
+ * @return {@code true} if {@code nodeName} identifies a group member in this scheme
+ */
+ boolean isMember(String nodeName);
+
+ /**
+ * Generates a node name for the node representing this process in the distributed group.
+ *
+ * @param membershipData the data that will be stored in this node
+ * @return the name for the node that will represent this process in the group
+ */
+ String createName(byte[] membershipData);
+
+ /**
+ * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
+ *
+ * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
+ */
+ boolean isSequential();
+ }
+
+ /**
+ * Indicates an error watching a group.
+ */
+ public static class WatchException extends Exception {
+ public WatchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Watches this group for the lifetime of this jvm process. This method will block until the
+ * current group members are available, notify the {@code groupChangeListener} and then return.
+ * All further changes to the group membership will cause notifications on a background thread.
+ *
+ * @param groupChangeListener the listener to notify of group membership change events
+ * @return A command which, when executed, will stop watching the group.
+ * @throws WatchException if there is a problem generating the 1st group membership list
+ * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
+ */
+ public final Command watch(final GroupChangeListener groupChangeListener)
+ throws WatchException, InterruptedException {
+ Preconditions.checkNotNull(groupChangeListener);
+
+ try {
+ ensurePersistentGroupPath();
+ } catch (JoinException e) {
+ throw new WatchException("Failed to create group path: " + path, e);
+ }
+
+ final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
+ backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, WatchException>() {
+ @Override public Boolean get() throws WatchException {
+ try {
+ groupMonitor.watchGroup();
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new WatchException("Interrupted trying to watch group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e);
+ return null;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e);
+ return null;
+ } else {
+ throw new WatchException("Problem trying to watch group at path: " + path, e);
+ }
+ }
+ }
+ });
+ return new Command() {
+ @Override public void execute() {
+ groupMonitor.stopWatching();
+ }
+ };
+ }
+
+ /**
+ * Helps continuously monitor a group for membership changes.
+ */
+ private class GroupMonitor {
+ private final GroupChangeListener groupChangeListener;
+ private volatile boolean stopped = false;
+ private Set<String> members;
+
+ GroupMonitor(GroupChangeListener groupChangeListener) {
+ this.groupChangeListener = groupChangeListener;
+ }
+
+ private final Watcher groupWatcher = new Watcher() {
+ @Override public final void process(WatchedEvent event) {
+ if (event.getType() == EventType.NodeChildrenChanged) {
+ tryWatchGroup();
+ }
+ }
+ };
+
+ private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
+ new ExceptionalSupplier<Boolean, InterruptedException>() {
+ @Override public Boolean get() throws InterruptedException {
+ try {
+ watchGroup();
+ return true;
+ } catch (ZooKeeperConnectionException e) {
+ LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.log(Level.WARNING, "Temporary error re-watching group: " + path, e);
+ return false;
+ } else {
+ throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
+ }
+ }
+ }
+ };
+
+ private void tryWatchGroup() {
+ if (stopped) {
+ return;
+ }
+
+ try {
+ backoffHelper.doUntilSuccess(tryWatchGroup);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
+ }
+ }
+
+ private void watchGroup()
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+ if (stopped) {
+ return;
+ }
+
+ List<String> children = zkClient.get().getChildren(path, groupWatcher);
+ setMembers(Iterables.filter(children, nodeNameFilter));
+ }
+
+ private void stopWatching() {
+ // TODO(William Farner): Cancel the watch when
+ // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
+ LOG.info("Stopping watch on " + this);
+ stopped = true;
+ }
+
+ synchronized void setMembers(Iterable<String> members) {
+ if (stopped) {
+ LOG.info("Suppressing membership update, no longer watching " + this);
+ return;
+ }
+
+ if (this.members == null) {
+ // Reset our watch on the group if session expires - only needs to be registered once.
+ zkClient.registerExpirationHandler(new Command() {
+ @Override public void execute() {
+ tryWatchGroup();
+ }
+ });
+ }
+
+ Set<String> membership = ImmutableSet.copyOf(members);
+ if (!membership.equals(this.members)) {
+ groupChangeListener.onGroupChange(members);
+ this.members = membership;
+ }
+ }
+ }
+
+ /**
+ * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
+ * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
+ * prefix is "member_", the node's full path will look something like
+ * {@code /discovery/servicename/member_0000000007}.
+ */
+ public static class DefaultScheme implements NodeScheme {
+ private final String namePrefix;
+ private final Pattern namePattern;
+
+ /**
+ * Creates a sequential node scheme based on the given node name prefix.
+ *
+ * @param namePrefix the prefix for the names of the member nodes
+ */
+ public DefaultScheme(String namePrefix) {
+ this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
+ namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
+ }
+
+ @Override
+ public boolean isMember(String nodeName) {
+ return namePattern.matcher(nodeName).matches();
+ }
+
+ @Override
+ public String createName(byte[] membershipData) {
+ return namePrefix;
+ }
+
+ @Override
+ public boolean isSequential() {
+ return true;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Group " + path;
+ }
+}