You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/25 20:19:33 UTC

[19/37] aurora git commit: Import of Twitter Commons.

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java b/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
new file mode 100644
index 0000000..ea1c994
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
@@ -0,0 +1,52 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.logging;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.LogManager;
+
+/**
+ * A custom java.util.logging configuration class that loads the logging configuration from a
+ * properties file resource (as opposed to a file as natively supported by LogManager via
+ * java.util.logging.config.file).  By default this configurator will look for the resource at
+ * /logging.properties but the resource path can be overridden by setting the system property with
+ * key {@link #LOGGING_PROPERTIES_RESOURCE_PATH java.util.logging.config.resource}.  To install this
+ * configurator you must specify the following system property:
+ * java.util.logging.config.class=com.twitter.common.util.logging.ResourceLoggingConfigurator
+ *
+ * @author John Sirois
+ */
+public class ResourceLoggingConfigurator {
+
+  /**
+   * A system property that controls where ResourceLoggingConfigurator looks for the logging
+   * configuration on the process classpath.
+   */
+  public static final String LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.config.resource";
+
+  public ResourceLoggingConfigurator() throws IOException {
+    String loggingPropertiesResourcePath =
+        System.getProperty(LOGGING_PROPERTIES_RESOURCE_PATH, "/logging.properties");
+    InputStream loggingConfig = getClass().getResourceAsStream(loggingPropertiesResourcePath);
+    Preconditions.checkNotNull(loggingConfig,
+        "Could not locate logging config file at resource path: %s", loggingPropertiesResourcePath);
+    LogManager.getLogManager().readConfiguration(loggingConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java b/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
new file mode 100644
index 0000000..14822ff
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
@@ -0,0 +1,51 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.logging;
+
+import java.util.logging.LogManager;
+
+/**
+ * A LogManager which by default ignores calls to {@link #reset()}.  This is useful to avoid missing
+ * log statements that occur during vm shutdown.  The standard LogManager installs a
+ * {@link Runtime#addShutdownHook(Thread) shutdown hook} that disables logging and this subclass
+ * nullifies that shutdown hook by disabling any reset of the LogManager by default.
+ *
+ * @author John Sirois
+ */
+public class UnresettableLogManager extends LogManager {
+
+  /**
+   * The system property that controls which LogManager the java.util.logging subsystem should load.
+   */
+  public static final String LOGGING_MANAGER = "java.util.logging.manager";
+
+  /**
+   * A system property which can be used to control an {@code UnresettableLogManager}'s behavior.
+   * If the UnresettableLogManager is installed, but an application still wants
+   * {@link LogManager#reset()} behavior, they can set this property to "false".
+   */
+  private static final String LOGGING_MANAGER_IGNORERESET = "java.util.logging.manager.ignorereset";
+
+  @Override
+  public void reset() throws SecurityException {
+    if (Boolean.parseBoolean(System.getProperty(LOGGING_MANAGER_IGNORERESET, "true"))) {
+      System.err.println("UnresettableLogManager is ignoring a reset() request.");
+    } else {
+      super.reset();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
new file mode 100644
index 0000000..f677b1a
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
@@ -0,0 +1,83 @@
+package com.twitter.common.util.templating;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import com.google.common.base.Preconditions;
+
+import org.antlr.stringtemplate.AutoIndentWriter;
+import org.antlr.stringtemplate.StringTemplate;
+import org.antlr.stringtemplate.StringTemplateGroup;
+
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.MorePreconditions;
+
+/**
+ * A class to simplify the operations required to load a stringtemplate template file from the
+ * classpath and populate it.
+ */
+public class StringTemplateHelper {
+
+  private final StringTemplateGroup group;
+  private final String templatePath;
+
+  /**
+   * Creates a new template helper.
+   *
+   * @param templateContextClass Classpath context for the location of the template file.
+   * @param templateName Template file name (excluding .st suffix) relative to
+   *     {@code templateContextClass}.
+   * @param cacheTemplates Whether the template should be cached.
+   */
+  public StringTemplateHelper(
+      Class<?> templateContextClass,
+      String templateName,
+      boolean cacheTemplates) {
+
+    MorePreconditions.checkNotBlank(templateName);
+    String templatePath =
+        templateContextClass.getPackage().getName().replace('.', '/') + "/" + templateName;
+    StringTemplateGroup group = new StringTemplateGroup(templateName);
+    Preconditions.checkNotNull(group.getInstanceOf(templatePath),
+        "Failed to load template at: %s", templatePath);
+
+    this.group = group;
+    if (!cacheTemplates) {
+      group.setRefreshInterval(0);
+    }
+    this.templatePath = templatePath;
+  }
+
+  /**
+   * Thrown when an exception is encountered while populating a template.
+   */
+  public static class TemplateException extends Exception {
+    public TemplateException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+
+  /**
+   * Writes the populated template to an output writer by providing a closure with access to
+   * the unpopulated template object.
+   *
+   * @param out Template output writer.
+   * @param parameterSetter Closure to populate the template.
+   * @throws TemplateException If an exception was encountered while populating the template.
+   */
+  public void writeTemplate(
+      Writer out,
+      Closure<StringTemplate> parameterSetter) throws TemplateException {
+
+    Preconditions.checkNotNull(out);
+    Preconditions.checkNotNull(parameterSetter);
+
+    StringTemplate stringTemplate = group.getInstanceOf(templatePath);
+    try {
+      parameterSetter.execute(stringTemplate);
+      stringTemplate.write(new AutoIndentWriter(out));
+    } catch (IOException e) {
+      throw new TemplateException("Failed to write template: " + e, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java b/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
new file mode 100644
index 0000000..d2eb318
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
@@ -0,0 +1,81 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.testing;
+
+import com.google.common.base.Preconditions;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A clock for use in testing with a configurable value for {@link #nowMillis()}.
+ *
+ * @author John Sirois
+ */
+public class FakeClock implements Clock {
+  // Tests may need to use the clock from multiple threads, ensure liveness.
+  private volatile long nowNanos;
+
+  /**
+   * Sets what {@link #nowMillis()} will return until this method is called again with a new value
+   * for {@code now}.
+   *
+   * @param nowMillis the current time in milliseconds
+   */
+  public void setNowMillis(long nowMillis) {
+    Preconditions.checkArgument(nowMillis >= 0);
+    this.nowNanos = TimeUnit.MILLISECONDS.toNanos(nowMillis);
+  }
+
+  /**
+   * Advances the current time by {@code millis} milliseconds.  Time can be retarded by passing a
+   * negative value.
+   *
+   * @param period the amount of time to advance the current time by
+   */
+  public void advance(Amount<Long, Time> period) {
+    Preconditions.checkNotNull(period);
+    long newNanos = nowNanos + period.as(Time.NANOSECONDS);
+    Preconditions.checkArgument(newNanos >= 0,
+        "invalid period %s - would move current time to a negative value: %sns", period, newNanos);
+    nowNanos = newNanos;
+  }
+
+  @Override
+  public long nowMillis() {
+    return TimeUnit.NANOSECONDS.toMillis(nowNanos);
+  }
+
+  @Override
+  public long nowNanos() {
+    return nowNanos;
+  }
+
+  /**
+   * Waits in fake time, immediately returning in real time; however a check of {@link #nowMillis}
+   * after this method completes will consistently reveal that {@code millis} did in fact pass while
+   * waiting.
+   *
+   * @param millis the amount of time to wait in milliseconds
+   */
+  @Override
+  public void waitFor(long millis) {
+    advance(Amount.of(millis, Time.MILLISECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java b/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
new file mode 100644
index 0000000..fe25df8
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
@@ -0,0 +1,56 @@
+package com.twitter.common.util.testing;
+
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import org.omg.CORBA.PUBLIC_MEMBER;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * A ticker for use in testing with a configurable value for {@link #Ticker#read()}.
+ */
+public class FakeTicker extends Ticker{
+  private long nowNanos;
+
+  /**
+   * Sets what {@link #read()} will return until this method is called again with a new value
+   * for {@code now}.
+   *
+   * @param nowNanos the current time in nanoseconds
+   */
+  public void setNowNanos(long nowNanos) {
+    this.nowNanos = nowNanos;
+  }
+
+  @Override
+  public long read(){
+    return nowNanos;
+  }
+
+  /**
+   * Advances the current time by the given {@code period}.  Time can be retarded by passing a
+   * negative value.
+   *
+   * @param period the amount of time to advance the current time by
+   */
+  public void advance(Amount<Long, Time> period) {
+    Preconditions.checkNotNull(period);
+    nowNanos = nowNanos + period.as(Time.NANOSECONDS);
+  }
+
+  /**
+   * Waits in fake time, immediately returning in real time; however a check of {@link #Ticker#read()}
+   * after this method completes will consistently reveal that {@code nanos} did in fact pass while
+   * waiting.
+   *
+   * @param nanos the amount of time to wait in nanoseconds
+   */
+  public void waitNanos(long nanos) {
+    advance(Amount.of(nanos, Time.NANOSECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java b/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
new file mode 100644
index 0000000..2b6eb21
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
@@ -0,0 +1,93 @@
+// =================================================================================================
+// Copyright 2012 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.webassets.bootstrap;
+
+import com.google.common.io.Resources;
+import com.google.common.net.MediaType;
+import com.google.inject.AbstractModule;
+
+import com.twitter.common.application.http.Registration;
+
+/**
+ * A binding module to register bootstrap HTTP assets.
+ */
+public final class BootstrapModule extends AbstractModule {
+  /**
+   * Enum for available Bootstrap versions to choose from.
+   */
+  public enum BootstrapVersion {
+    VERSION_2_1_1 ("2.1.1"),
+    VERSION_2_3_2 ("2.3.2");
+
+    private final String version;
+
+    BootstrapVersion(String s) {
+      version = s;
+    }
+  }
+
+  private final String version;
+
+  /**
+   * Default constructor.
+   */
+  public BootstrapModule() {
+    this(BootstrapVersion.VERSION_2_1_1);
+  }
+
+  /**
+   * BootstrapModule Constructor.
+   *
+   * @param version supplies the bootstrap version to select.
+   */
+  public BootstrapModule(BootstrapVersion version) {
+    this.version = version.version;
+  }
+
+  private void register(String mountPath, String resourcePath, String contentType) {
+    Registration.registerHttpAsset(
+        binder(),
+        "/" + mountPath,
+        Resources.getResource(BootstrapModule.class, resourcePath),
+        contentType,
+        true);
+  }
+
+  @Override
+  protected void configure() {
+    register(
+        "css/bootstrap-responsive.min.css",
+        version + "/css/bootstrap-responsive.min.css",
+        MediaType.CSS_UTF_8.toString());
+    register(
+        "css/bootstrap.min.css",
+        version + "/css/bootstrap.min.css",
+        MediaType.CSS_UTF_8.toString());
+    register(
+        "img/glyphicons-halflings-white.png",
+        version + "/img/glyphicons-halflings-white.png",
+        MediaType.PNG.toString());
+    register(
+        "img/glyphicons-halflings.png",
+        version + "/img/glyphicons-halflings.png",
+        MediaType.PNG.toString());
+    register(
+        "js/bootstrap.min.js",
+        version + "/js/bootstrap.min.js",
+        MediaType.JAVASCRIPT_UTF_8.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java b/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
new file mode 100644
index 0000000..316b328
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
@@ -0,0 +1,39 @@
+// =================================================================================================
+// Copyright 2012 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.webassets.jquery;
+
+import com.google.common.io.Resources;
+import com.google.common.net.MediaType;
+import com.google.inject.AbstractModule;
+
+import com.twitter.common.application.http.Registration;
+
+/**
+ * A binding module to register jQuery HTTP assets.
+ */
+public final class JQueryModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    Registration.registerHttpAsset(
+        binder(),
+        "/js/jquery.min.js",
+        Resources.getResource(JQueryModule.class, "js/jquery-1.8.2.min.js"),
+        MediaType.JAVASCRIPT_UTF_8.toString(),
+        true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java b/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
new file mode 100644
index 0000000..bc9ec63
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
@@ -0,0 +1,82 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import org.apache.zookeeper.KeeperException;
+
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.WatchException;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * Interface definition for becoming or querying for a ZooKeeper-based group leader.
+ */
+public interface Candidate {
+
+  /**
+   * Returns the current group leader by querying ZooKeeper synchronously.
+   *
+   * @return the current group leader's identifying data or {@link Optional#absent()} if there is
+   *     no leader
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading the leader information
+   * @throws InterruptedException if this thread is interrupted getting the leader
+   */
+  public Optional<byte[]> getLeaderData()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException;
+
+  /**
+   * Encapsulates a leader that can be elected and subsequently defeated.
+   */
+  interface Leader {
+
+    /**
+     * Called when this leader has been elected.
+     *
+     * @param abdicate a command that can be used to abdicate leadership and force a new election
+     */
+    void onElected(ExceptionalCommand<JoinException> abdicate);
+
+    /**
+     * Called when the leader has been ousted.  Can occur either if the leader abdicates or if an
+     * external event causes the leader to lose its leadership role (session expiration).
+     */
+    void onDefeated();
+  }
+
+  /**
+   * Offers this candidate in leadership elections for as long as the current jvm process is alive.
+   * Upon election, the {@code onElected} callback will be executed and a command that can be used
+   * to abdicate leadership will be passed in.  If the elected leader jvm process dies or the
+   * elected leader successfully abdicates then a new leader will be elected.  Leaders that
+   * successfully abdicate are removed from the group and will not be eligible for leadership
+   * election unless {@link #offerLeadership(Leader)} is called again.
+   *
+   * @param leader the leader to notify of election and defeat events
+   * @throws JoinException if there was a problem joining the group
+   * @throws WatchException if there is a problem generating the 1st group membership list
+   * @throws InterruptedException if interrupted waiting to join the group and determine initial
+   *     election results
+   * @return a supplier that can be queried to find out if this leader is currently elected
+   */
+  public Supplier<Boolean> offerLeadership(Leader leader)
+        throws JoinException, WatchException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
new file mode 100644
index 0000000..3361a7f
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
@@ -0,0 +1,184 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.apache.zookeeper.KeeperException;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.zookeeper.Group.GroupChangeListener;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.Membership;
+import com.twitter.common.zookeeper.Group.WatchException;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * Implements leader election for small groups of candidates.  This implementation is subject to the
+ * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
+ * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
+ */
+public class CandidateImpl implements Candidate {
+  private static final Logger LOG = Logger.getLogger(CandidateImpl.class.getName());
+
+  private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
+
+  private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = new Supplier<byte[]>() {
+    @Override public byte[] get() {
+      try {
+        return InetAddress.getLocalHost().getHostAddress().getBytes();
+      } catch (UnknownHostException e) {
+        LOG.log(Level.WARNING, "Failed to determine local address!", e);
+        return UNKNOWN_CANDIDATE_DATA;
+      }
+    }
+  };
+
+  private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
+      new Function<Iterable<String>, String>() {
+        @Override public String apply(Iterable<String> candidates) {
+          return Ordering.natural().min(candidates);
+        }
+      };
+
+  private final Group group;
+  private final Function<Iterable<String>, String> judge;
+  private final Supplier<byte[]> dataSupplier;
+
+  /**
+   * Equivalent to {@link #CandidateImpl(Group, com.google.common.base.Function, Supplier)} using a
+   * judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest or
+   * 1st candidate and a default supplier that provides the ip address of this host according to
+   * {@link java.net.InetAddress#getLocalHost()} as the leader identifying data.
+   */
+  public CandidateImpl(Group group) {
+    this(group, MOST_RECENT_JUDGE, IP_ADDRESS_DATA_SUPPLIER);
+  }
+
+  /**
+   * Creates a candidate that can be used to offer leadership for the given {@code group} using
+   * a judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest
+   * or 1st. The dataSupplier should produce bytes that identify this process as leader. These bytes
+   * will become available to all participants via the {@link Candidate#getLeaderData()} method.
+   */
+  public CandidateImpl(Group group, Supplier<byte[]> dataSupplier) {
+    this(group, MOST_RECENT_JUDGE, dataSupplier);
+  }
+
+  /**
+   * Creates a candidate that can be used to offer leadership for the given {@code group}.  The
+   * {@code judge} is used to pick the current leader from all group members whenever the group
+   * membership changes. To form a well-behaved election group with one leader, all candidates
+   * should use the same judge. The dataSupplier should produce bytes that identify this process
+   * as leader. These bytes will become available to all participants via the
+   * {@link Candidate#getLeaderData()} method.
+   */
+  public CandidateImpl(
+      Group group,
+      Function<Iterable<String>, String> judge,
+      Supplier<byte[]> dataSupplier) {
+    this.group = Preconditions.checkNotNull(group);
+    this.judge = Preconditions.checkNotNull(judge);
+    this.dataSupplier = Preconditions.checkNotNull(dataSupplier);
+  }
+
+  @Override
+  public Optional<byte[]> getLeaderData()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+
+    String leaderId = getLeader(group.getMemberIds());
+    return leaderId == null
+        ? Optional.<byte[]>absent()
+        : Optional.of(group.getMemberData(leaderId));
+  }
+
+  @Override
+  public Supplier<Boolean> offerLeadership(final Leader leader)
+      throws JoinException, WatchException, InterruptedException {
+
+    final Membership membership = group.join(dataSupplier, new Command() {
+      @Override public void execute() {
+        leader.onDefeated();
+      }
+    });
+
+    final AtomicBoolean elected = new AtomicBoolean(false);
+    final AtomicBoolean abdicated = new AtomicBoolean(false);
+    group.watch(new GroupChangeListener() {
+        @Override public void onGroupChange(Iterable<String> memberIds) {
+          boolean noCandidates = Iterables.isEmpty(memberIds);
+          String memberId = membership.getMemberId();
+
+          if (noCandidates) {
+            LOG.warning("All candidates have temporarily left the group: " + group);
+          } else if (!Iterables.contains(memberIds, memberId)) {
+            LOG.severe(String.format(
+                "Current member ID %s is not a candidate for leader, current voting: %s",
+                memberId, memberIds));
+          } else {
+            boolean electedLeader = memberId.equals(getLeader(memberIds));
+            boolean previouslyElected = elected.getAndSet(electedLeader);
+
+            if (!previouslyElected && electedLeader) {
+              LOG.info(String.format("Candidate %s is now leader of group: %s",
+                  membership.getMemberPath(), memberIds));
+
+              leader.onElected(new ExceptionalCommand<JoinException>() {
+                @Override public void execute() throws JoinException {
+                  membership.cancel();
+                  abdicated.set(true);
+                }
+              });
+            } else if (!electedLeader) {
+              if (previouslyElected) {
+                leader.onDefeated();
+              }
+              LOG.info(String.format(
+                  "Candidate %s waiting for the next leader election, current voting: %s",
+                  membership.getMemberPath(), memberIds));
+            }
+          }
+        }
+      });
+
+    return new Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return !abdicated.get() && elected.get();
+        }
+      };
+  }
+
+  @Nullable
+  private String getLeader(Iterable<String> memberIds) {
+    return Iterables.isEmpty(memberIds) ? null : judge.apply(memberIds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
new file mode 100644
index 0000000..2e7260b
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
@@ -0,0 +1,211 @@
+package com.twitter.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Commands;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+/**
+ * A ServerSet that delegates all calls to other ServerSets.
+ */
+public class CompoundServerSet implements ServerSet {
+  private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n');
+
+  private final List<ServerSet> serverSets;
+  private final Map<ServerSet, ImmutableSet<ServiceInstance>> instanceCache = Maps.newHashMap();
+  private final List<HostChangeMonitor<ServiceInstance>> monitors = Lists.newArrayList();
+  private Command stopWatching = null;
+  private ImmutableSet<ServiceInstance> allHosts = ImmutableSet.of();
+
+  /**
+   * Create new ServerSet from a list of serverSets.
+   *
+   * @param serverSets serverSets to which the calls will be delegated.
+   */
+  public CompoundServerSet(Iterable<ServerSet> serverSets) {
+    MorePreconditions.checkNotBlank(serverSets);
+    this.serverSets = ImmutableList.copyOf(serverSets);
+  }
+
+  private interface JoinOp {
+    EndpointStatus doJoin(ServerSet serverSet) throws JoinException, InterruptedException;
+  }
+
+  private interface StatusOp {
+    void changeStatus(EndpointStatus status) throws UpdateException;
+  }
+
+  private void changeStatus(
+      ImmutableList<EndpointStatus> statuses,
+      StatusOp statusOp) throws UpdateException {
+
+    ImmutableList.Builder<String> builder = ImmutableList.builder();
+    int errorIdx = 1;
+    for (EndpointStatus endpointStatus : statuses) {
+      try {
+        statusOp.changeStatus(endpointStatus);
+      } catch (UpdateException exception) {
+        builder.add(String.format("[%d] %s", errorIdx++,
+            Throwables.getStackTraceAsString(exception)));
+      }
+    }
+    if (errorIdx > 1) {
+      throw new UpdateException(
+          "One or more ServerSet update failed: " + STACK_TRACE_JOINER.join(builder.build()));
+    }
+  }
+
+  private EndpointStatus doJoin(JoinOp joiner) throws JoinException, InterruptedException {
+    // Get the list of endpoint status from the serverSets.
+    ImmutableList.Builder<EndpointStatus> builder = ImmutableList.builder();
+    for (ServerSet serverSet : serverSets) {
+      builder.add(joiner.doJoin(serverSet));
+    }
+
+    final ImmutableList<EndpointStatus> statuses = builder.build();
+
+    return new EndpointStatus() {
+      @Override public void leave() throws UpdateException {
+        changeStatus(statuses, new StatusOp() {
+          @Override public void changeStatus(EndpointStatus status) throws UpdateException {
+            status.leave();
+          }
+        });
+      }
+
+      @Override public void update(final Status newStatus) throws UpdateException {
+        changeStatus(statuses, new StatusOp() {
+          @Override public void changeStatus(EndpointStatus status) throws UpdateException {
+            status.update(newStatus);
+          }
+        });
+      }
+    };
+  }
+
+  @Override
+  public EndpointStatus join(
+      final InetSocketAddress endpoint,
+      final Map<String, InetSocketAddress> additionalEndpoints)
+      throws Group.JoinException, InterruptedException {
+
+    return doJoin(new JoinOp() {
+      @Override public EndpointStatus doJoin(ServerSet serverSet)
+          throws JoinException, InterruptedException {
+        return serverSet.join(endpoint, additionalEndpoints);
+      }
+    });
+  }
+
+  /*
+   * If any one of the serverSet throws an exception during respective join, the exception is
+   * propagated. Join is successful only if all the joins are successful.
+   *
+   * NOTE: If an exception occurs during the join, the serverSets in the composite can be in a
+   * partially joined state.
+   *
+   * @see ServerSet#join(InetSocketAddress, Map, Status)
+   */
+  @Override
+  public EndpointStatus join(
+      final InetSocketAddress endpoint,
+      final Map<String, InetSocketAddress> additionalEndpoints,
+      final Status status) throws Group.JoinException, InterruptedException {
+
+    return doJoin(new JoinOp() {
+      @Override public EndpointStatus doJoin(ServerSet serverSet)
+          throws JoinException, InterruptedException {
+
+        return serverSet.join(endpoint, additionalEndpoints, status);
+      }
+    });
+  }
+
+  @Override
+  public EndpointStatus join(
+      final InetSocketAddress endpoint,
+      final Map<String, InetSocketAddress> additionalEndpoints,
+      final int shardId) throws JoinException, InterruptedException {
+
+    return doJoin(new JoinOp() {
+      @Override public EndpointStatus doJoin(ServerSet serverSet)
+          throws JoinException, InterruptedException {
+
+        return serverSet.join(endpoint, additionalEndpoints, shardId);
+      }
+    });
+  }
+
+  // Handles changes to the union of hosts.
+  private synchronized void handleChange(ServerSet serverSet, ImmutableSet<ServiceInstance> hosts) {
+    instanceCache.put(serverSet, hosts);
+
+    // Get the union of hosts.
+    ImmutableSet<ServiceInstance> currentHosts =
+        ImmutableSet.copyOf(Iterables.concat(instanceCache.values()));
+
+    // Check if the hosts have changed.
+    if (!currentHosts.equals(allHosts)) {
+      allHosts = currentHosts;
+
+      // Notify the monitors.
+      for (HostChangeMonitor<ServiceInstance> monitor : monitors) {
+        monitor.onChange(allHosts);
+      }
+    }
+  }
+
+  /**
+   * Monitor the CompoundServerSet.
+   *
+   * If any one of the monitor calls to the underlying serverSet raises a MonitorException, the
+   * exception is propagated. The call is successful only if all the monitor calls to the
+   * underlying serverSets are successful.
+   *
+   * NOTE: If an exception occurs during the monitor call, the serverSets in the composite will not
+   * be monitored.
+   *
+   * @param monitor HostChangeMonitor instance used to monitor host changes.
+   * @return A command that, when executed, will stop monitoring all underlying server sets.
+   * @throws MonitorException If there was a problem monitoring any of the underlying server sets.
+   */
+  @Override
+  public synchronized Command watch(HostChangeMonitor<ServiceInstance> monitor)
+      throws MonitorException {
+    if (stopWatching == null) {
+      monitors.add(monitor);
+      ImmutableList.Builder<Command> commandsBuilder = ImmutableList.builder();
+
+      for (final ServerSet serverSet : serverSets) {
+        commandsBuilder.add(serverSet.watch(new HostChangeMonitor<ServiceInstance>() {
+          @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) {
+            handleChange(serverSet, hostSet);
+          }
+        }));
+      }
+
+      stopWatching = Commands.compound(commandsBuilder.build());
+    }
+
+    return stopWatching;
+  }
+
+  @Override
+  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+    watch(monitor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
new file mode 100644
index 0000000..fdcd8d9
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
@@ -0,0 +1,42 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DistributedLock
+ *
+ * @author Florian Leibert
+ */
+public interface DistributedLock {
+  void lock() throws LockingException;
+
+  boolean tryLock(long timeout, TimeUnit unit);
+
+  void unlock() throws LockingException;
+
+  public static class LockingException extends RuntimeException {
+    public LockingException(String msg, Exception e) {
+      super(msg, e);
+    }
+
+    public LockingException(String msg) {
+      super(msg);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
new file mode 100644
index 0000000..7669f92
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
@@ -0,0 +1,289 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import com.twitter.common.base.MorePreconditions;
+
+/**
+ * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock,
+ * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a
+ * list of children for the lock node. Due to the nature of sequential, all the ids are increasing
+ * in order, therefore the client with the least ID according to natural ordering will hold the
+ * lock. Every other client watches the id immediately preceding its own id and checks for the lock
+ * in case of notification. The client holding the lock does the work and finally deletes the node,
+ * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but
+ * avoided in most cases because if a client drops dead while holding the lock, the ZK session
+ * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks
+ * could occur if the the worker thread on a client hangs but the zk-client thread is still alive.
+ * There could be an external monitor client that ensures that alerts are triggered if the least-id
+ * ephemeral node is present past a time-out.
+ * <p/>
+ * Note: Locking attempts will fail in case session expires!
+ *
+ * @author Florian Leibert
+ */
+@ThreadSafe
+public class DistributedLockImpl implements DistributedLock {
+
+  private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName());
+
+  private final ZooKeeperClient zkClient;
+  private final String lockPath;
+  private final ImmutableList<ACL> acl;
+
+  private final AtomicBoolean aborted = new AtomicBoolean(false);
+  private CountDownLatch syncPoint;
+  private boolean holdsLock = false;
+  private String currentId;
+  private String currentNode;
+  private String watchedNode;
+  private LockWatcher watcher;
+
+  /**
+   * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default
+   * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
+   */
+  public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) {
+    this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  /**
+   * Creates a distributed lock using the given {@code zkClient} to coordinate locking.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param lockPath The path used to manage the lock under.
+   * @param acl The acl to apply to newly created lock nodes.
+   */
+  public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) {
+    this.zkClient = Preconditions.checkNotNull(zkClient);
+    this.lockPath = MorePreconditions.checkNotBlank(lockPath);
+    this.acl = ImmutableList.copyOf(acl);
+    this.syncPoint = new CountDownLatch(1);
+  }
+
+  private synchronized void prepare()
+    throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+    ZooKeeperUtils.ensurePath(zkClient, acl, lockPath);
+    LOG.log(Level.FINE, "Working with locking path:" + lockPath);
+
+    // Create an EPHEMERAL_SEQUENTIAL node.
+    currentNode =
+        zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+    // We only care about our actual id since we want to compare ourselves to siblings.
+    if (currentNode.contains("/")) {
+      currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1);
+    }
+    LOG.log(Level.FINE, "Received ID from zk:" + currentId);
+    this.watcher = new LockWatcher();
+  }
+
+  @Override
+  public synchronized void lock() throws LockingException {
+    if (holdsLock) {
+      throw new LockingException("Error, already holding a lock. Call unlock first!");
+    }
+    try {
+      prepare();
+      watcher.checkForLock();
+      syncPoint.await();
+      if (!holdsLock) {
+        throw new LockingException("Error, couldn't acquire the lock!");
+      }
+    } catch (InterruptedException e) {
+      cancelAttempt();
+      throw new LockingException("InterruptedException while trying to acquire lock!", e);
+    } catch (KeeperException e) {
+      // No need to clean up since the node wasn't created yet.
+      throw new LockingException("KeeperException while trying to acquire lock!", e);
+    } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+      // No need to clean up since the node wasn't created yet.
+      throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
+    }
+  }
+
+  @Override
+  public synchronized boolean tryLock(long timeout, TimeUnit unit) {
+    if (holdsLock) {
+      throw new LockingException("Error, already holding a lock. Call unlock first!");
+    }
+    try {
+      prepare();
+      watcher.checkForLock();
+      boolean success = syncPoint.await(timeout, unit);
+      if (!success) {
+        return false;
+      }
+      if (!holdsLock) {
+        throw new LockingException("Error, couldn't acquire the lock!");
+      }
+    } catch (InterruptedException e) {
+      cancelAttempt();
+      return false;
+    } catch (KeeperException e) {
+      // No need to clean up since the node wasn't created yet.
+      throw new LockingException("KeeperException while trying to acquire lock!", e);
+    } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+      // No need to clean up since the node wasn't created yet.
+      throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
+    }
+    return true;
+  }
+
+  @Override
+  public synchronized void unlock() throws LockingException {
+    if (currentId == null) {
+      throw new LockingException("Error, neither attempting to lock nor holding a lock!");
+    }
+    Preconditions.checkNotNull(currentId);
+    // Try aborting!
+    if (!holdsLock) {
+      aborted.set(true);
+      LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!");
+    } else {
+      LOG.log(Level.INFO, "Cleaning up this locks ephemeral node.");
+      cleanup();
+    }
+  }
+
+  //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token?
+
+  private synchronized void cancelAttempt() {
+    LOG.log(Level.INFO, "Cancelling lock attempt!");
+    cleanup();
+    // Bubble up failure...
+    holdsLock = false;
+    syncPoint.countDown();
+  }
+
+  private void cleanup() {
+    LOG.info("Cleaning up!");
+    Preconditions.checkNotNull(currentId);
+    try {
+      Stat stat = zkClient.get().exists(currentNode, false);
+      if (stat != null) {
+        zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION);
+      } else {
+        LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    holdsLock = false;
+    aborted.set(false);
+    currentId = null;
+    currentNode = null;
+    watcher = null;
+    syncPoint = new CountDownLatch(1);
+  }
+
+  class LockWatcher implements Watcher {
+
+    public synchronized void checkForLock() {
+      MorePreconditions.checkNotBlank(currentId);
+
+      try {
+        List<String> candidates = zkClient.get().getChildren(lockPath, null);
+        ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates);
+
+        // Unexpected behavior if there are no children!
+        if (sortedMembers.isEmpty()) {
+          throw new LockingException("Error, member list is empty!");
+        }
+
+        int memberIndex = sortedMembers.indexOf(currentId);
+
+        // If we hold the lock
+        if (memberIndex == 0) {
+          holdsLock = true;
+          syncPoint.countDown();
+        } else {
+          final String nextLowestNode = sortedMembers.get(memberIndex - 1);
+          LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " +
+              "waiting for [%s] to release lock.", currentId, nextLowestNode));
+
+          watchedNode = String.format("%s/%s", lockPath, nextLowestNode);
+          Stat stat = zkClient.get().exists(watchedNode, this);
+          if (stat == null) {
+            checkForLock();
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+            "got interrupted. Trying to cancel lock acquisition.", currentId), e);
+        cancelAttempt();
+      } catch (KeeperException e) {
+        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+            "got a KeeperException. Trying to cancel lock acquisition.", currentId), e);
+        cancelAttempt();
+      } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+            "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e);
+        cancelAttempt();
+      }
+    }
+
+    @Override
+    public synchronized void process(WatchedEvent event) {
+      // this handles the case where we have aborted a lock and deleted ourselves but still have a
+      // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub.
+      if (!event.getPath().equals(watchedNode)) {
+        LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode);
+        return;
+      }
+      //TODO(Florian Leibert): Pull this into the outer class.
+      if (event.getType() == Watcher.Event.EventType.None) {
+        switch (event.getState()) {
+          case SyncConnected:
+            // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort.
+            LOG.info("Reconnected...");
+            break;
+          case Expired:
+            LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId));
+            cancelAttempt();
+            break;
+        }
+      } else if (event.getType() == Event.EventType.NodeDeleted) {
+        checkForLock();
+      } else {
+        LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Group.java b/commons/src/main/java/com/twitter/common/zookeeper/Group.java
new file mode 100644
index 0000000..81c451c
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/zookeeper/Group.java
@@ -0,0 +1,711 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Commands;
+import com.twitter.common.base.ExceptionalSupplier;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.util.BackoffHelper;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * This class exposes methods for joining and monitoring distributed groups.  The groups this class
+ * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
+ * each member of a group.
+ */
+public class Group {
+  private static final Logger LOG = Logger.getLogger(Group.class.getName());
+
+  private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
+  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
+
+  private final ZooKeeperClient zkClient;
+  private final ImmutableList<ACL> acl;
+  private final String path;
+
+  private final NodeScheme nodeScheme;
+  private final Predicate<String> nodeNameFilter;
+
+  private final BackoffHelper backoffHelper;
+
+  /**
+   * Creates a group rooted at the given {@code path}.  Paths must be absolute and trailing or
+   * duplicate slashes will be normalized.  For example, all the following paths would create a
+   * group at the normalized path /my/distributed/group:
+   * <ul>
+   *   <li>/my/distributed/group
+   *   <li>/my/distributed/group/
+   *   <li>/my/distributed//group
+   * </ul>
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param acl the ACL to use for creating the persistent group path if it does not already exist
+   * @param path the absolute persistent path that represents this group
+   * @param nodeScheme the scheme that defines how nodes are created
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
+    this.zkClient = Preconditions.checkNotNull(zkClient);
+    this.acl = ImmutableList.copyOf(acl);
+    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
+
+    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
+    nodeNameFilter = new Predicate<String>() {
+      @Override public boolean apply(String nodeName) {
+        return Group.this.nodeScheme.isMember(nodeName);
+      }
+    };
+
+    backoffHelper = new BackoffHelper();
+  }
+
+  /**
+   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
+   * {@code namePrefix} of 'member_'.
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
+  }
+
+  /**
+   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
+   * {@link DefaultScheme} using {@code namePrefix}.
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
+    this(zkClient, acl, path, new DefaultScheme(namePrefix));
+  }
+
+  public String getMemberPath(String memberId) {
+    return path + "/" + MorePreconditions.checkNotBlank(memberId);
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public String getMemberId(String nodePath) {
+    MorePreconditions.checkNotBlank(nodePath);
+    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
+        "Not a member of this group[%s]: %s", path, nodePath);
+
+    String memberId = StringUtils.substringAfterLast(nodePath, "/");
+    Preconditions.checkArgument(nodeScheme.isMember(memberId),
+        "Not a group member: %s", memberId);
+    return memberId;
+  }
+
+  /**
+   * Returns the current list of group member ids by querying ZooKeeper synchronously.
+   *
+   * @return the ids of all the present members of this group
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading this group's member ids
+   * @throws InterruptedException if this thread is interrupted listing the group members
+   */
+  public Iterable<String> getMemberIds()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+    return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
+  }
+
+  /**
+   * Gets the data for one of this groups members by querying ZooKeeper synchronously.
+   *
+   * @param memberId the id of the member whose data to retrieve
+   * @return the data associated with the {@code memberId}
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading this member's data
+   * @throws InterruptedException if this thread is interrupted retrieving the member data
+   */
+  public byte[] getMemberData(String memberId)
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+    return zkClient.get().getData(getMemberPath(memberId), false, null);
+  }
+
+  /**
+   * Represents membership in a distributed group.
+   */
+  public interface Membership {
+
+    /**
+     * Returns the persistent ZooKeeper path that represents this group.
+     */
+    String getGroupPath();
+
+    /**
+     * Returns the id (ZooKeeper node name) of this group member.  May change over time if the
+     * ZooKeeper session expires.
+     */
+    String getMemberId();
+
+    /**
+     * Returns the full ZooKeeper path to this group member.  May change over time if the
+     * ZooKeeper session expires.
+     */
+    String getMemberPath();
+
+    /**
+     * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
+     * {@link Group#join()}.
+     *
+     * @return the new membership data
+     * @throws UpdateException if there was a problem updating the membership data
+     */
+    byte[] updateMemberData() throws UpdateException;
+
+    /**
+     * Cancels group membership by deleting the associated ZooKeeper member node.
+     *
+     * @throws JoinException if there is a problem deleting the node
+     */
+    void cancel() throws JoinException;
+  }
+
+  /**
+   * Indicates an error joining a group.
+   */
+  public static class JoinException extends Exception {
+    public JoinException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Indicates an error updating a group member's data.
+   */
+  public static class UpdateException extends Exception {
+    public UpdateException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Equivalent to calling {@code join(null, null)}.
+   */
+  public final Membership join() throws JoinException, InterruptedException {
+    return join(NO_MEMBER_DATA, null);
+  }
+
+  /**
+   * Equivalent to calling {@code join(memberData, null)}.
+   */
+  public final Membership join(Supplier<byte[]> memberData)
+      throws JoinException, InterruptedException {
+
+    return join(memberData, null);
+  }
+
+  /**
+   * Equivalent to calling {@code join(null, onLoseMembership)}.
+   */
+  public final Membership join(@Nullable final Command onLoseMembership)
+      throws JoinException, InterruptedException {
+
+    return join(NO_MEMBER_DATA, onLoseMembership);
+  }
+
+  /**
+   * Joins this group and returns the resulting Membership when successful.  Membership will be
+   * automatically cancelled when the current jvm process dies; however the returned Membership
+   * object can be used to cancel membership earlier.  Unless
+   * {@link com.twitter.common.zookeeper.Group.Membership#cancel()} is called the membership will
+   * be maintained by re-establishing it silently in the background.
+   *
+   * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper.  If an
+   * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
+   * membership in the group.
+   *
+   * @param memberData a supplier of the data to store in the member node
+   * @param onLoseMembership a callback to notify when membership is lost
+   * @return a Membership object with the member details
+   * @throws JoinException if there was a problem joining the group
+   * @throws InterruptedException if this thread is interrupted awaiting completion of the join
+   */
+  public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
+      throws JoinException, InterruptedException {
+
+    Preconditions.checkNotNull(memberData);
+    ensurePersistentGroupPath();
+
+    final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
+    return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, JoinException>() {
+      @Override public Membership get() throws JoinException {
+        try {
+          return groupJoiner.join();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new JoinException("Interrupted trying to join group at path: " + path, e);
+        } catch (ZooKeeperConnectionException e) {
+          LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e);
+          return null;
+        } catch (KeeperException e) {
+          if (zkClient.shouldRetry(e)) {
+            LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e);
+            return null;
+          } else {
+            throw new JoinException("Problem joining partition group at path: " + path, e);
+          }
+        }
+      }
+    });
+  }
+
+  private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
+    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() {
+      @Override public Boolean get() throws JoinException {
+        try {
+          ZooKeeperUtils.ensurePath(zkClient, acl, path);
+          return true;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
+        } catch (ZooKeeperConnectionException e) {
+          LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
+          return false;
+        } catch (KeeperException e) {
+          if (zkClient.shouldRetry(e)) {
+            LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, e);
+            return false;
+          } else {
+            throw new JoinException("Problem ensuring group at path: " + path, e);
+          }
+        }
+      }
+    });
+  }
+
+  private class ActiveMembership implements Membership {
+    private final Supplier<byte[]> memberData;
+    private final Command onLoseMembership;
+    private String nodePath;
+    private String memberId;
+    private volatile boolean cancelled;
+    private byte[] membershipData;
+
+    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
+      this.memberData = memberData;
+      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
+    }
+
+    @Override
+    public String getGroupPath() {
+      return path;
+    }
+
+    @Override
+    public synchronized String getMemberId() {
+      return memberId;
+    }
+
+    @Override
+    public synchronized String getMemberPath() {
+      return nodePath;
+    }
+
+    @Override
+    public synchronized byte[] updateMemberData() throws UpdateException {
+      byte[] membershipData = memberData.get();
+      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
+        try {
+          zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
+          this.membershipData = membershipData;
+        } catch (KeeperException e) {
+          throw new UpdateException("Problem updating membership data.", e);
+        } catch (InterruptedException e) {
+          throw new UpdateException("Interrupted attempting to update membership data.", e);
+        } catch (ZooKeeperConnectionException e) {
+          throw new UpdateException(
+              "Could not connect to the ZooKeeper cluster to update membership data.", e);
+        }
+      }
+      return membershipData;
+    }
+
+    @Override
+    public synchronized void cancel() throws JoinException {
+      if (!cancelled) {
+        try {
+          backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() {
+            @Override public Boolean get() throws JoinException {
+              try {
+                zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
+                return true;
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
+              } catch (ZooKeeperConnectionException e) {
+                LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
+                return false;
+              } catch (NoNodeException e) {
+                LOG.info("Membership already cancelled, node at path: " + nodePath +
+                         " has been deleted");
+                return true;
+              } catch (KeeperException e) {
+                if (zkClient.shouldRetry(e)) {
+                  LOG.log(Level.WARNING, "Temporary error cancelling membership: " + nodePath, e);
+                  return false;
+                } else {
+                  throw new JoinException("Problem cancelling membership: " + nodePath, e);
+                }
+              }
+            }
+          });
+          cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new JoinException("Problem cancelling membership: " + nodePath, e);
+        }
+      }
+    }
+
+    private class CancelledException extends IllegalStateException { /* marker */ }
+
+    synchronized Membership join()
+        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+      if (cancelled) {
+        throw new CancelledException();
+      }
+
+      if (nodePath == null) {
+        // Re-join if our ephemeral node goes away due to session expiry - only needs to be
+        // registered once.
+        zkClient.registerExpirationHandler(new Command() {
+          @Override public void execute() {
+            tryJoin();
+          }
+        });
+      }
+
+      byte[] membershipData = memberData.get();
+      String nodeName = nodeScheme.createName(membershipData);
+      CreateMode createMode = nodeScheme.isSequential()
+          ? CreateMode.EPHEMERAL_SEQUENTIAL
+          : CreateMode.EPHEMERAL;
+      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
+      memberId = Group.this.getMemberId(nodePath);
+      LOG.info("Set group member ID to " + memberId);
+      this.membershipData = membershipData;
+
+      // Re-join if our ephemeral node goes away due to maliciousness.
+      zkClient.get().exists(nodePath, new Watcher() {
+        @Override public void process(WatchedEvent event) {
+          if (event.getType() == EventType.NodeDeleted) {
+            tryJoin();
+          }
+        }
+      });
+
+      return this;
+    }
+
+    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
+        new ExceptionalSupplier<Boolean, InterruptedException>() {
+          @Override public Boolean get() throws InterruptedException {
+            try {
+              join();
+              return true;
+            } catch (CancelledException e) {
+              // Lost a cancel race - that's ok.
+              return true;
+            } catch (ZooKeeperConnectionException e) {
+              LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
+              return false;
+            } catch (KeeperException e) {
+              if (zkClient.shouldRetry(e)) {
+                LOG.log(Level.WARNING, "Temporary error re-joining group: " + path, e);
+                return false;
+              } else {
+                throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
+              }
+            }
+          }
+        };
+
+    private synchronized void tryJoin() {
+      onLoseMembership.execute();
+      try {
+        backoffHelper.doUntilSuccess(tryJoin);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
+      }
+    }
+  }
+
+  /**
+   * An interface to an object that listens for changes to a group's membership.
+   */
+  public interface GroupChangeListener {
+
+    /**
+     * Called whenever group membership changes with the new list of member ids.
+     *
+     * @param memberIds the current member ids
+     */
+    void onGroupChange(Iterable<String> memberIds);
+  }
+
+  /**
+   * An interface that dictates the scheme to use for storing and filtering nodes that represent
+   * members of a distributed group.
+   */
+  public interface NodeScheme {
+    /**
+     * Determines if a child node is a member of a group by examining the node's name.
+     *
+     * @param nodeName the name of a child node found in a group
+     * @return {@code true} if {@code nodeName} identifies a group member in this scheme
+     */
+    boolean isMember(String nodeName);
+
+    /**
+     * Generates a node name for the node representing this process in the distributed group.
+     *
+     * @param membershipData the data that will be stored in this node
+     * @return the name for the node that will represent this process in the group
+     */
+    String createName(byte[] membershipData);
+
+    /**
+     * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
+     *
+     * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
+     */
+    boolean isSequential();
+  }
+
+  /**
+   * Indicates an error watching a group.
+   */
+  public static class WatchException extends Exception {
+    public WatchException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Watches this group for the lifetime of this jvm process.  This method will block until the
+   * current group members are available, notify the {@code groupChangeListener} and then return.
+   * All further changes to the group membership will cause notifications on a background thread.
+   *
+   * @param groupChangeListener the listener to notify of group membership change events
+   * @return A command which, when executed, will stop watching the group.
+   * @throws WatchException if there is a problem generating the 1st group membership list
+   * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
+   */
+  public final Command watch(final GroupChangeListener groupChangeListener)
+      throws WatchException, InterruptedException {
+    Preconditions.checkNotNull(groupChangeListener);
+
+    try {
+      ensurePersistentGroupPath();
+    } catch (JoinException e) {
+      throw new WatchException("Failed to create group path: " + path, e);
+    }
+
+    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
+    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, WatchException>() {
+      @Override public Boolean get() throws WatchException {
+        try {
+          groupMonitor.watchGroup();
+          return true;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new WatchException("Interrupted trying to watch group at path: " + path, e);
+        } catch (ZooKeeperConnectionException e) {
+          LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e);
+          return null;
+        } catch (KeeperException e) {
+          if (zkClient.shouldRetry(e)) {
+            LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e);
+            return null;
+          } else {
+            throw new WatchException("Problem trying to watch group at path: " + path, e);
+          }
+        }
+      }
+    });
+    return new Command() {
+      @Override public void execute() {
+        groupMonitor.stopWatching();
+      }
+    };
+  }
+
+  /**
+   * Helps continuously monitor a group for membership changes.
+   */
+  private class GroupMonitor {
+    private final GroupChangeListener groupChangeListener;
+    private volatile boolean stopped = false;
+    private Set<String> members;
+
+    GroupMonitor(GroupChangeListener groupChangeListener) {
+      this.groupChangeListener = groupChangeListener;
+    }
+
+    private final Watcher groupWatcher = new Watcher() {
+      @Override public final void process(WatchedEvent event) {
+        if (event.getType() == EventType.NodeChildrenChanged) {
+          tryWatchGroup();
+        }
+      }
+    };
+
+    private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
+        new ExceptionalSupplier<Boolean, InterruptedException>() {
+          @Override public Boolean get() throws InterruptedException {
+            try {
+              watchGroup();
+              return true;
+            } catch (ZooKeeperConnectionException e) {
+              LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
+              return false;
+            } catch (KeeperException e) {
+              if (zkClient.shouldRetry(e)) {
+                LOG.log(Level.WARNING, "Temporary error re-watching group: " + path, e);
+                return false;
+              } else {
+                throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
+              }
+            }
+          }
+        };
+
+    private void tryWatchGroup() {
+      if (stopped) {
+        return;
+      }
+
+      try {
+        backoffHelper.doUntilSuccess(tryWatchGroup);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
+      }
+    }
+
+    private void watchGroup()
+        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+      if (stopped) {
+        return;
+      }
+
+      List<String> children = zkClient.get().getChildren(path, groupWatcher);
+      setMembers(Iterables.filter(children, nodeNameFilter));
+    }
+
+    private void stopWatching() {
+      // TODO(William Farner): Cancel the watch when
+      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
+      LOG.info("Stopping watch on " + this);
+      stopped = true;
+    }
+
+    synchronized void setMembers(Iterable<String> members) {
+      if (stopped) {
+        LOG.info("Suppressing membership update, no longer watching " + this);
+        return;
+      }
+
+      if (this.members == null) {
+        // Reset our watch on the group if session expires - only needs to be registered once.
+        zkClient.registerExpirationHandler(new Command() {
+          @Override public void execute() {
+            tryWatchGroup();
+          }
+        });
+      }
+
+      Set<String> membership = ImmutableSet.copyOf(members);
+      if (!membership.equals(this.members)) {
+        groupChangeListener.onGroupChange(members);
+        this.members = membership;
+      }
+    }
+  }
+
+  /**
+   * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
+   * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
+   * prefix is "member_", the node's full path will look something like
+   * {@code /discovery/servicename/member_0000000007}.
+   */
+  public static class DefaultScheme implements NodeScheme {
+    private final String namePrefix;
+    private final Pattern namePattern;
+
+    /**
+     * Creates a sequential node scheme based on the given node name prefix.
+     *
+     * @param namePrefix the prefix for the names of the member nodes
+     */
+    public DefaultScheme(String namePrefix) {
+      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
+      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
+    }
+
+    @Override
+    public boolean isMember(String nodeName) {
+      return namePattern.matcher(nodeName).matches();
+    }
+
+    @Override
+    public String createName(byte[] membershipData) {
+      return namePrefix;
+    }
+
+    @Override
+    public boolean isSequential() {
+      return true;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "Group " + path;
+  }
+}