You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/04/05 04:42:20 UTC

aurora git commit: Extract a SingletonService interface.

Repository: aurora
Updated Branches:
  refs/heads/master f402899bc -> f729fd4d2


Extract a SingletonService interface.

This makes space for introducing an Apache Curator implementation.

Bugs closed: AURORA-1468

Reviewed at https://reviews.apache.org/r/45723/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f729fd4d
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f729fd4d
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f729fd4d

Branch: refs/heads/master
Commit: f729fd4d2a57e6a83904d680b59429aa5daf9a78
Parents: f402899
Author: John Sirois <js...@apache.org>
Authored: Mon Apr 4 20:42:17 2016 -0600
Committer: John Sirois <jo...@gmail.com>
Committed: Mon Apr 4 20:42:17 2016 -0600

----------------------------------------------------------------------
 .../common/zookeeper/SingletonService.java      | 133 +++-------
 .../common/zookeeper/SingletonServiceImpl.java  | 122 ++++++++++
 .../zookeeper/SingletonServiceImplTest.java     | 243 ++++++++++++++++++
 .../common/zookeeper/SingletonServiceTest.java  | 244 -------------------
 .../aurora/scheduler/SchedulerLifecycle.java    |  14 +-
 .../aurora/scheduler/app/SchedulerMain.java     |   7 +-
 .../scheduler/app/ServiceDiscoveryModule.java   |   5 +-
 .../scheduler/SchedulerLifecycleTest.java       |   2 +-
 8 files changed, 415 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
index 20accd2..3561d07 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
@@ -15,56 +15,38 @@ package org.apache.aurora.common.zookeeper;
 
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.zookeeper.data.ACL;
 
 /**
- * A service that uses master election to only allow a single instance of the server to join
- * the {@link ServerSet} at a time.
+ * A service that uses master election to only allow a single service instance to be active amongst
+ * a set of potential servers at a time.
  */
-public class SingletonService {
-  @VisibleForTesting
-  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+public interface SingletonService {
 
   /**
-   * Creates a candidate that can be combined with an existing server set to form a singleton
-   * service using {@link #SingletonService(ServerSet, Candidate)}.
-   *
-   * @param zkClient The ZooKeeper client to use.
-   * @param servicePath The path where service nodes live.
-   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
-   * @return A candidate that can be housed with a standard server set under a single zk path.
+   * Indicates an error attempting to lead a group of servers.
    */
-  public static Candidate createSingletonCandidate(
-      ZooKeeperClient zkClient,
-      String servicePath,
-      Iterable<ACL> acl) {
-
-    return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+  class LeadException extends Exception {
+    public LeadException(String message, Throwable cause) {
+      super(message, cause);
+    }
   }
 
-  private final ServerSet serverSet;
-  private final Candidate candidate;
+  /**
+   * Indicates an error attempting to advertise leadership of a group of servers.
+   */
+  class AdvertiseException extends Exception {
+    public AdvertiseException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
 
   /**
-   * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
-   * advertises itself in the given server set once elected.
-   *
-   * @param serverSet The server set to advertise in on election.
-   * @param candidate The candidacy to use to vie for election.
+   * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
    */
-  public SingletonService(ServerSet serverSet, Candidate candidate) {
-    this.serverSet = Preconditions.checkNotNull(serverSet);
-    this.candidate = Preconditions.checkNotNull(candidate);
+  class LeaveException extends Exception {
+    public LeaveException(String message, Throwable cause) {
+      super(message, cause);
+    }
   }
 
   /**
@@ -73,55 +55,20 @@ public class SingletonService {
    * @param endpoint The primary endpoint to register as a leader candidate in the service.
    * @param additionalEndpoints Additional endpoints that are available on the host.
    * @param listener Handler to call when the candidate is elected or defeated.
-   * @throws Group.WatchException If there was a problem watching the ZooKeeper group.
-   * @throws Group.JoinException If there was a problem joining the ZooKeeper group.
+   * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
    * @throws InterruptedException If the thread watching/joining the group was interrupted.
    */
-  public void lead(final InetSocketAddress endpoint,
-                   final Map<String, InetSocketAddress> additionalEndpoints,
-                   final LeadershipListener listener)
-                   throws Group.WatchException, Group.JoinException, InterruptedException {
-
-    Preconditions.checkNotNull(listener);
-
-    candidate.offerLeadership(new Leader() {
-      private ServerSet.EndpointStatus endpointStatus = null;
-      @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
-        listener.onLeading(new LeaderControl() {
-          ServerSet.EndpointStatus endpointStatus = null;
-          final AtomicBoolean left = new AtomicBoolean(false);
-
-          // Methods are synchronized to prevent simultaneous invocations.
-          @Override public synchronized void advertise()
-              throws JoinException, InterruptedException {
-
-            Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
-            Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
-            endpointStatus = serverSet.join(endpoint, additionalEndpoints);
-          }
-
-          @Override public synchronized void leave() throws ServerSet.UpdateException, JoinException {
-            Preconditions.checkState(left.compareAndSet(false, true),
-                "Cannot leave more than once.");
-            if (endpointStatus != null) {
-              endpointStatus.leave();
-            }
-            abdicate.execute();
-          }
-        });
-      }
-
-      @Override public void onDefeated() {
-        listener.onDefeated(endpointStatus);
-      }
-    });
-  }
+  void lead(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      LeadershipListener listener)
+      throws LeadException, InterruptedException;
 
   /**
    * A listener to be notified of changes in the leadership status.
    * Implementers should be careful to avoid blocking operations in these callbacks.
    */
-  public interface LeadershipListener {
+  interface LeadershipListener {
 
     /**
      * Notifies the listener that is is current leader.
@@ -131,35 +78,33 @@ public class SingletonService {
     void onLeading(LeaderControl control);
 
     /**
-     * Notifies the listener that it is no longer leader.  The leader should take this opportunity
-     * to remove its advertisement gracefully.
-     *
-     * @param status A handle on the endpoint status for the advertised leader.
+     * Notifies the listener that it is no longer leader.
      */
-    void onDefeated(@Nullable ServerSet.EndpointStatus status);
+    void onDefeated();
   }
 
   /**
    * A controller for the state of the leader.  This will be provided to the leader upon election,
-   * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and
-   * terminate leadership at will.
+   * which allows the leader to decide when to advertise as leader of the server set and terminate
+   * leadership at will.
    */
-  public interface LeaderControl {
+  interface LeaderControl {
 
     /**
      * Advertises the leader's server presence to clients.
      *
-     * @throws JoinException If there was an error advertising.
+     * @throws AdvertiseException If there was an error advertising the singleton leader to clients
+     *     of the server set.
      * @throws InterruptedException If interrupted while advertising.
      */
-    void advertise() throws JoinException, InterruptedException;
+    void advertise() throws AdvertiseException, InterruptedException;
 
     /**
      * Leaves candidacy for leadership, removing advertised server presence if applicable.
      *
-     * @throws ServerSet.UpdateException If the leader's status could not be updated.
-     * @throws JoinException If there was an error abdicating from leader election.
+     * @throws LeaveException If the leader's status could not be updated or there was an error
+     *     abdicating server set leadership.
      */
-    void leave() throws ServerSet.UpdateException, JoinException;
+    void leave() throws LeaveException;
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
new file mode 100644
index 0000000..d9978a9
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.zookeeper.data.ACL;
+
+public class SingletonServiceImpl implements SingletonService {
+  @VisibleForTesting
+  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+  /**
+   * Creates a candidate that can be combined with an existing server set to form a singleton
+   * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param servicePath The path where service nodes live.
+   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+   * @return A candidate that can be housed with a standard server set under a single zk path.
+   */
+  public static Candidate createSingletonCandidate(
+      ZooKeeperClient zkClient,
+      String servicePath,
+      Iterable<ACL> acl) {
+
+    return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+  }
+
+  private final ServerSet serverSet;
+  private final Candidate candidate;
+
+  /**
+   * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
+   * advertises itself in the given server set once elected.
+   *
+   * @param serverSet The server set to advertise in on election.
+   * @param candidate The candidacy to use to vie for election.
+   */
+  public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
+    this.serverSet = Preconditions.checkNotNull(serverSet);
+    this.candidate = Preconditions.checkNotNull(candidate);
+  }
+
+  @Override
+  public void lead(final InetSocketAddress endpoint,
+                   final Map<String, InetSocketAddress> additionalEndpoints,
+                   final LeadershipListener listener)
+                   throws LeadException, InterruptedException {
+
+    Preconditions.checkNotNull(listener);
+
+    try {
+      candidate.offerLeadership(new Leader() {
+        @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
+          listener.onLeading(new LeaderControl() {
+            ServerSet.EndpointStatus endpointStatus = null;
+            final AtomicBoolean left = new AtomicBoolean(false);
+
+            // Methods are synchronized to prevent simultaneous invocations.
+            @Override public synchronized void advertise()
+                throws AdvertiseException, InterruptedException {
+
+              Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
+              Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
+              try {
+                endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+              } catch (JoinException e) {
+                throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
+              }
+            }
+
+            @Override public synchronized void leave() throws LeaveException {
+              Preconditions.checkState(left.compareAndSet(false, true),
+                  "Cannot leave more than once.");
+              if (endpointStatus != null) {
+                try {
+                  endpointStatus.leave();
+                } catch (ServerSet.UpdateException e) {
+                  throw new LeaveException("Problem updating endpoint status for abdicating leader " +
+                      "at endpoint " + endpoint, e);
+                }
+              }
+              try {
+                abdicate.execute();
+              } catch (JoinException e) {
+                throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
+              }
+            }
+          });
+        }
+
+        @Override public void onDefeated() {
+          listener.onDefeated();
+        }
+      });
+    } catch (JoinException e) {
+      throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
+    } catch (Group.WatchException e) {
+      throw new LeadException("Problem getting initial membership list for leadership group.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
new file mode 100644
index 0000000..82df845
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.fail;
+
+public class SingletonServiceImplTest extends BaseZooKeeperTest {
+  private static final int PORT_A = 1234;
+  private static final int PORT_B = 8080;
+  private static final InetSocketAddress PRIMARY_ENDPOINT =
+      InetSocketAddress.createUnresolved("foo", PORT_A);
+  private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
+      ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
+
+  private IMocksControl control;
+  private SingletonServiceImpl.LeadershipListener listener;
+  private ServerSet serverSet;
+  private ServerSet.EndpointStatus endpointStatus;
+  private Candidate candidate;
+  private ExceptionalCommand<Group.JoinException> abdicate;
+
+  private SingletonService service;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void mySetUp() throws IOException {
+    control = createControl();
+    addTearDown(control::verify);
+    listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
+    serverSet = control.createMock(ServerSet.class);
+    candidate = control.createMock(Candidate.class);
+    endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
+    abdicate = control.createMock(ExceptionalCommand.class);
+
+    service = new SingletonServiceImpl(serverSet, candidate);
+  }
+
+  private void newLeader(
+      final String hostName,
+      Capture<Leader> leader,
+      LeadershipListener listener) throws Exception {
+
+    service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
+        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
+        listener);
+
+    // This actually elects the leader.
+    leader.getValue().onElected(abdicate);
+  }
+
+  private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
+    newLeader(hostName, leader, listener);
+  }
+
+  private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
+    return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
+  }
+
+  @Test
+  public void testLeadAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+    endpointStatus.leave();
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().leave();
+  }
+
+  @Test
+  public void teatLeadLeaveNoAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    abdicate.execute();
+
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().leave();
+  }
+
+  @Test
+  public void testLeadJoinFailure() throws Exception {
+    Capture<Leader> leaderCapture = new Capture<Leader>();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+
+    try {
+      controlCapture.getValue().advertise();
+      fail("Join should have failed.");
+    } catch (SingletonService.AdvertiseException e) {
+      // Expected.
+    }
+
+    controlCapture.getValue().leave();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().advertise();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleLeave() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+    endpointStatus.leave();
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().leave();
+    controlCapture.getValue().leave();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testAdvertiseAfterLeave() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().leave();
+    controlCapture.getValue().advertise();
+  }
+
+  @Test
+  public void testLeadMulti() throws Exception {
+    List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
+    List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
+
+    for (int i = 0; i < 5; i++) {
+      Capture<Leader> leaderCapture = new Capture<Leader>();
+      leaderCaptures.add(leaderCapture);
+      Capture<LeaderControl> controlCapture = createCapture();
+      leaderControlCaptures.add(controlCapture);
+
+      expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+      listener.onLeading(capture(controlCapture));
+      InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
+      Map<String, InetSocketAddress> aux =
+          ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
+      expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
+      endpointStatus.leave();
+      abdicate.execute();
+    }
+
+    control.replay();
+
+    for (int i = 0; i < 5; i++) {
+      final String leaderName = "foo" + i;
+      newLeader(leaderName, leaderCaptures.get(i));
+      leaderControlCaptures.get(i).getValue().advertise();
+      leaderControlCaptures.get(i).getValue().leave();
+    }
+  }
+
+  @Test
+  public void testLeaderLeaves() throws Exception {
+    control.replay();
+    shutdownNetwork();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java
deleted file mode 100644
index 454ae22..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
-import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
-import org.easymock.Capture;
-import org.easymock.IExpectationSetters;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.fail;
-
-public class SingletonServiceTest extends BaseZooKeeperTest {
-  private static final int PORT_A = 1234;
-  private static final int PORT_B = 8080;
-  private static final InetSocketAddress PRIMARY_ENDPOINT =
-      InetSocketAddress.createUnresolved("foo", PORT_A);
-  private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
-      ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
-
-  private IMocksControl control;
-  private SingletonService.LeadershipListener listener;
-  private ServerSet serverSet;
-  private ServerSet.EndpointStatus endpointStatus;
-  private Candidate candidate;
-  private ExceptionalCommand<Group.JoinException> abdicate;
-
-  private SingletonService service;
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void mySetUp() throws IOException {
-    control = createControl();
-    addTearDown(control::verify);
-    listener = control.createMock(SingletonService.LeadershipListener.class);
-    serverSet = control.createMock(ServerSet.class);
-    candidate = control.createMock(Candidate.class);
-    endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
-    abdicate = control.createMock(ExceptionalCommand.class);
-
-    service = new SingletonService(serverSet, candidate);
-  }
-
-  private void newLeader(
-      final String hostName,
-      Capture<Leader> leader,
-      LeadershipListener listener) throws Exception {
-
-    service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
-        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
-        listener);
-
-    // This actually elects the leader.
-    leader.getValue().onElected(abdicate);
-  }
-
-  private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
-    newLeader(hostName, leader, listener);
-  }
-
-  private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
-    return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
-  }
-
-  @Test
-  public void testLeadAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-    endpointStatus.leave();
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().leave();
-  }
-
-  @Test
-  public void teatLeadLeaveNoAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    abdicate.execute();
-
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().leave();
-  }
-
-  @Test
-  public void testLeadJoinFailure() throws Exception {
-    Capture<Leader> leaderCapture = new Capture<Leader>();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-
-    try {
-      controlCapture.getValue().advertise();
-      fail("Join should have failed.");
-    } catch (JoinException e) {
-      // Expected.
-    }
-
-    controlCapture.getValue().leave();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testMultipleAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().advertise();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testMultipleLeave() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-    endpointStatus.leave();
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().leave();
-    controlCapture.getValue().leave();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testAdvertiseAfterLeave() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().leave();
-    controlCapture.getValue().advertise();
-  }
-
-  @Test
-  public void testLeadMulti() throws Exception {
-    List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
-    List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
-
-    for (int i = 0; i < 5; i++) {
-      Capture<Leader> leaderCapture = new Capture<Leader>();
-      leaderCaptures.add(leaderCapture);
-      Capture<LeaderControl> controlCapture = createCapture();
-      leaderControlCaptures.add(controlCapture);
-
-      expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-      listener.onLeading(capture(controlCapture));
-      InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
-      Map<String, InetSocketAddress> aux =
-          ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
-      expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
-      endpointStatus.leave();
-      abdicate.execute();
-    }
-
-    control.replay();
-
-    for (int i = 0; i < 5; i++) {
-      final String leaderName = "foo" + i;
-      newLeader(leaderName, leaderCaptures.get(i));
-      leaderControlCaptures.get(i).getValue().advertise();
-      leaderControlCaptures.get(i).getValue().leave();
-    }
-  }
-
-  @Test
-  public void testLeaderLeaves() throws Exception {
-    control.replay();
-    shutdownNetwork();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index b15540c..debe899 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
@@ -48,8 +47,7 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.StateMachine;
 import org.apache.aurora.common.util.StateMachine.Transition;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -83,7 +81,7 @@ import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipList
  * {@link java.lang.IllegalStateException}.
  * <p>
  * At any point in the lifecycle, the scheduler will respond to
- * {@link LeadershipListener#onDefeated(ServerSet.EndpointStatus)
+ * {@link LeadershipListener#onDefeated()
  * onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}.
  * A clean shutdown will also be initiated if control actions fail during normal state transitions.
  */
@@ -274,7 +272,7 @@ public class SchedulerLifecycle implements EventSubscriber {
         schedulerActiveServiceManager.startAsync().awaitHealthy();
         try {
           leaderControl.get().advertise();
-        } catch (JoinException | InterruptedException e) {
+        } catch (SingletonService.AdvertiseException | InterruptedException e) {
           LOG.error("Failed to advertise leader, shutting down.");
           throw Throwables.propagate(e);
         }
@@ -297,10 +295,8 @@ public class SchedulerLifecycle implements EventSubscriber {
           if (control != null) {
             try {
               control.leave();
-            } catch (JoinException e) {
+            } catch (SingletonService.LeaveException e) {
               LOG.warn("Failed to leave leadership: " + e, e);
-            } catch (ServerSet.UpdateException e) {
-              LOG.warn("Failed to leave server set: " + e, e);
             }
           }
 
@@ -395,7 +391,7 @@ public class SchedulerLifecycle implements EventSubscriber {
     }
 
     @Override
-    public void onDefeated(@Nullable ServerSet.EndpointStatus status) {
+    public void onDefeated() {
       LOG.error("Lost leadership, committing suicide.");
       stateMachine.transition(State.DEAD);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 60b66e8..11f6ad1 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -42,7 +42,6 @@ import org.apache.aurora.common.args.constraints.NotEmpty;
 import org.apache.aurora.common.args.constraints.NotNull;
 import org.apache.aurora.common.inject.Bindings;
 import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.common.zookeeper.Group;
 import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
 import org.apache.aurora.gen.ServerInfo;
@@ -120,10 +119,8 @@ public class SchedulerMain {
           httpSocketAddress,
           ImmutableMap.of("http", httpSocketAddress),
           leaderListener);
-    } catch (Group.WatchException e) {
-      throw new IllegalStateException("Failed to watch group and lead service.", e);
-    } catch (Group.JoinException e) {
-      throw new IllegalStateException("Failed to join scheduler service group.", e);
+    } catch (SingletonService.LeadException e) {
+      throw new IllegalStateException("Failed to lead service.", e);
     } catch (InterruptedException e) {
       throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
index 97977fd..240164f 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
@@ -25,6 +25,7 @@ import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.zookeeper.ServerSet;
 import org.apache.aurora.common.zookeeper.ServerSetImpl;
 import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
 import org.apache.aurora.common.zookeeper.ZooKeeperClient;
 import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials;
 import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
@@ -85,8 +86,8 @@ public class ServiceDiscoveryModule extends AbstractModule {
       ServerSet serverSet,
       List<ACL> zookeeperAcls) {
 
-    return new SingletonService(
+    return new SingletonServiceImpl(
         serverSet,
-        SingletonService.createSingletonCandidate(client, serverSetPath, zookeeperAcls));
+        SingletonServiceImpl.createSingletonCandidate(client, serverSetPath, zookeeperAcls));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index e225ae5..051c520 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -179,7 +179,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
 
     LeadershipListener leaderListener = schedulerLifecycle.prepare();
     leaderListener.onLeading(leaderControl);
-    leaderListener.onDefeated(null);
+    leaderListener.onDefeated();
   }
 
   @Test