You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/04/05 04:42:20 UTC
aurora git commit: Extract a SingletonService interface.
Repository: aurora
Updated Branches:
refs/heads/master f402899bc -> f729fd4d2
Extract a SingletonService interface.
This makes space for introducing an Apache Curator implementation.
Bugs closed: AURORA-1468
Reviewed at https://reviews.apache.org/r/45723/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f729fd4d
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f729fd4d
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f729fd4d
Branch: refs/heads/master
Commit: f729fd4d2a57e6a83904d680b59429aa5daf9a78
Parents: f402899
Author: John Sirois <js...@apache.org>
Authored: Mon Apr 4 20:42:17 2016 -0600
Committer: John Sirois <jo...@gmail.com>
Committed: Mon Apr 4 20:42:17 2016 -0600
----------------------------------------------------------------------
.../common/zookeeper/SingletonService.java | 133 +++-------
.../common/zookeeper/SingletonServiceImpl.java | 122 ++++++++++
.../zookeeper/SingletonServiceImplTest.java | 243 ++++++++++++++++++
.../common/zookeeper/SingletonServiceTest.java | 244 -------------------
.../aurora/scheduler/SchedulerLifecycle.java | 14 +-
.../aurora/scheduler/app/SchedulerMain.java | 7 +-
.../scheduler/app/ServiceDiscoveryModule.java | 5 +-
.../scheduler/SchedulerLifecycleTest.java | 2 +-
8 files changed, 415 insertions(+), 355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
index 20accd2..3561d07 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
@@ -15,56 +15,38 @@ package org.apache.aurora.common.zookeeper;
import java.net.InetSocketAddress;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.zookeeper.data.ACL;
/**
- * A service that uses master election to only allow a single instance of the server to join
- * the {@link ServerSet} at a time.
+ * A service that uses master election to only allow a single service instance to be active amongst
+ * a set of potential servers at a time.
*/
-public class SingletonService {
- @VisibleForTesting
- static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+public interface SingletonService {
/**
- * Creates a candidate that can be combined with an existing server set to form a singleton
- * service using {@link #SingletonService(ServerSet, Candidate)}.
- *
- * @param zkClient The ZooKeeper client to use.
- * @param servicePath The path where service nodes live.
- * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
- * @return A candidate that can be housed with a standard server set under a single zk path.
+ * Indicates an error attempting to lead a group of servers.
*/
- public static Candidate createSingletonCandidate(
- ZooKeeperClient zkClient,
- String servicePath,
- Iterable<ACL> acl) {
-
- return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+ class LeadException extends Exception {
+ public LeadException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
- private final ServerSet serverSet;
- private final Candidate candidate;
+ /**
+ * Indicates an error attempting to advertise leadership of a group of servers.
+ */
+ class AdvertiseException extends Exception {
+ public AdvertiseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
/**
- * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
- * advertises itself in the given server set once elected.
- *
- * @param serverSet The server set to advertise in on election.
- * @param candidate The candidacy to use to vie for election.
+ * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
*/
- public SingletonService(ServerSet serverSet, Candidate candidate) {
- this.serverSet = Preconditions.checkNotNull(serverSet);
- this.candidate = Preconditions.checkNotNull(candidate);
+ class LeaveException extends Exception {
+ public LeaveException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
/**
@@ -73,55 +55,20 @@ public class SingletonService {
* @param endpoint The primary endpoint to register as a leader candidate in the service.
* @param additionalEndpoints Additional endpoints that are available on the host.
* @param listener Handler to call when the candidate is elected or defeated.
- * @throws Group.WatchException If there was a problem watching the ZooKeeper group.
- * @throws Group.JoinException If there was a problem joining the ZooKeeper group.
+ * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
* @throws InterruptedException If the thread watching/joining the group was interrupted.
*/
- public void lead(final InetSocketAddress endpoint,
- final Map<String, InetSocketAddress> additionalEndpoints,
- final LeadershipListener listener)
- throws Group.WatchException, Group.JoinException, InterruptedException {
-
- Preconditions.checkNotNull(listener);
-
- candidate.offerLeadership(new Leader() {
- private ServerSet.EndpointStatus endpointStatus = null;
- @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
- listener.onLeading(new LeaderControl() {
- ServerSet.EndpointStatus endpointStatus = null;
- final AtomicBoolean left = new AtomicBoolean(false);
-
- // Methods are synchronized to prevent simultaneous invocations.
- @Override public synchronized void advertise()
- throws JoinException, InterruptedException {
-
- Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
- Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
- endpointStatus = serverSet.join(endpoint, additionalEndpoints);
- }
-
- @Override public synchronized void leave() throws ServerSet.UpdateException, JoinException {
- Preconditions.checkState(left.compareAndSet(false, true),
- "Cannot leave more than once.");
- if (endpointStatus != null) {
- endpointStatus.leave();
- }
- abdicate.execute();
- }
- });
- }
-
- @Override public void onDefeated() {
- listener.onDefeated(endpointStatus);
- }
- });
- }
+ void lead(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ LeadershipListener listener)
+ throws LeadException, InterruptedException;
/**
* A listener to be notified of changes in the leadership status.
* Implementers should be careful to avoid blocking operations in these callbacks.
*/
- public interface LeadershipListener {
+ interface LeadershipListener {
/**
* Notifies the listener that is is current leader.
@@ -131,35 +78,33 @@ public class SingletonService {
void onLeading(LeaderControl control);
/**
- * Notifies the listener that it is no longer leader. The leader should take this opportunity
- * to remove its advertisement gracefully.
- *
- * @param status A handle on the endpoint status for the advertised leader.
+ * Notifies the listener that it is no longer leader.
*/
- void onDefeated(@Nullable ServerSet.EndpointStatus status);
+ void onDefeated();
}
/**
* A controller for the state of the leader. This will be provided to the leader upon election,
- * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and
- * terminate leadership at will.
+ * which allows the leader to decide when to advertise as leader of the server set and terminate
+ * leadership at will.
*/
- public interface LeaderControl {
+ interface LeaderControl {
/**
* Advertises the leader's server presence to clients.
*
- * @throws JoinException If there was an error advertising.
+ * @throws AdvertiseException If there was an error advertising the singleton leader to clients
+ * of the server set.
* @throws InterruptedException If interrupted while advertising.
*/
- void advertise() throws JoinException, InterruptedException;
+ void advertise() throws AdvertiseException, InterruptedException;
/**
* Leaves candidacy for leadership, removing advertised server presence if applicable.
*
- * @throws ServerSet.UpdateException If the leader's status could not be updated.
- * @throws JoinException If there was an error abdicating from leader election.
+ * @throws LeaveException If the leader's status could not be updated or there was an error
+ * abdicating server set leadership.
*/
- void leave() throws ServerSet.UpdateException, JoinException;
+ void leave() throws LeaveException;
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
new file mode 100644
index 0000000..d9978a9
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.zookeeper.data.ACL;
+
+public class SingletonServiceImpl implements SingletonService {
+ @VisibleForTesting
+ static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+ /**
+ * Creates a candidate that can be combined with an existing server set to form a singleton
+ * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
+ *
+ * @param zkClient The ZooKeeper client to use.
+ * @param servicePath The path where service nodes live.
+ * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+ * @return A candidate that can be housed with a standard server set under a single zk path.
+ */
+ public static Candidate createSingletonCandidate(
+ ZooKeeperClient zkClient,
+ String servicePath,
+ Iterable<ACL> acl) {
+
+ return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+ }
+
+ private final ServerSet serverSet;
+ private final Candidate candidate;
+
+ /**
+ * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
+ * advertises itself in the given server set once elected.
+ *
+ * @param serverSet The server set to advertise in on election.
+ * @param candidate The candidacy to use to vie for election.
+ */
+ public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
+ this.serverSet = Preconditions.checkNotNull(serverSet);
+ this.candidate = Preconditions.checkNotNull(candidate);
+ }
+
+ @Override
+ public void lead(final InetSocketAddress endpoint,
+ final Map<String, InetSocketAddress> additionalEndpoints,
+ final LeadershipListener listener)
+ throws LeadException, InterruptedException {
+
+ Preconditions.checkNotNull(listener);
+
+ try {
+ candidate.offerLeadership(new Leader() {
+ @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
+ listener.onLeading(new LeaderControl() {
+ ServerSet.EndpointStatus endpointStatus = null;
+ final AtomicBoolean left = new AtomicBoolean(false);
+
+ // Methods are synchronized to prevent simultaneous invocations.
+ @Override public synchronized void advertise()
+ throws AdvertiseException, InterruptedException {
+
+ Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
+ Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
+ try {
+ endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+ } catch (JoinException e) {
+ throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
+ }
+ }
+
+ @Override public synchronized void leave() throws LeaveException {
+ Preconditions.checkState(left.compareAndSet(false, true),
+ "Cannot leave more than once.");
+ if (endpointStatus != null) {
+ try {
+ endpointStatus.leave();
+ } catch (ServerSet.UpdateException e) {
+ throw new LeaveException("Problem updating endpoint status for abdicating leader " +
+ "at endpoint " + endpoint, e);
+ }
+ }
+ try {
+ abdicate.execute();
+ } catch (JoinException e) {
+ throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
+ }
+ }
+ });
+ }
+
+ @Override public void onDefeated() {
+ listener.onDefeated();
+ }
+ });
+ } catch (JoinException e) {
+ throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
+ } catch (Group.WatchException e) {
+ throw new LeadException("Problem getting initial membership list for leadership group.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
new file mode 100644
index 0000000..82df845
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.fail;
+
+public class SingletonServiceImplTest extends BaseZooKeeperTest {
+ private static final int PORT_A = 1234;
+ private static final int PORT_B = 8080;
+ private static final InetSocketAddress PRIMARY_ENDPOINT =
+ InetSocketAddress.createUnresolved("foo", PORT_A);
+ private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
+
+ private IMocksControl control;
+ private SingletonServiceImpl.LeadershipListener listener;
+ private ServerSet serverSet;
+ private ServerSet.EndpointStatus endpointStatus;
+ private Candidate candidate;
+ private ExceptionalCommand<Group.JoinException> abdicate;
+
+ private SingletonService service;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void mySetUp() throws IOException {
+ control = createControl();
+ addTearDown(control::verify);
+ listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
+ serverSet = control.createMock(ServerSet.class);
+ candidate = control.createMock(Candidate.class);
+ endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
+ abdicate = control.createMock(ExceptionalCommand.class);
+
+ service = new SingletonServiceImpl(serverSet, candidate);
+ }
+
+ private void newLeader(
+ final String hostName,
+ Capture<Leader> leader,
+ LeadershipListener listener) throws Exception {
+
+ service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
+ listener);
+
+ // This actually elects the leader.
+ leader.getValue().onElected(abdicate);
+ }
+
+ private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
+ newLeader(hostName, leader, listener);
+ }
+
+ private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
+ return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
+ }
+
+ @Test
+ public void testLeadAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().leave();
+ }
+
+ @Test
+ public void teatLeadLeaveNoAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ abdicate.execute();
+
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().leave();
+ }
+
+ @Test
+ public void testLeadJoinFailure() throws Exception {
+ Capture<Leader> leaderCapture = new Capture<Leader>();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+
+ try {
+ controlCapture.getValue().advertise();
+ fail("Join should have failed.");
+ } catch (SingletonService.AdvertiseException e) {
+ // Expected.
+ }
+
+ controlCapture.getValue().leave();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMultipleAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().advertise();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMultipleLeave() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().leave();
+ controlCapture.getValue().leave();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testAdvertiseAfterLeave() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().leave();
+ controlCapture.getValue().advertise();
+ }
+
+ @Test
+ public void testLeadMulti() throws Exception {
+ List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
+ List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
+
+ for (int i = 0; i < 5; i++) {
+ Capture<Leader> leaderCapture = new Capture<Leader>();
+ leaderCaptures.add(leaderCapture);
+ Capture<LeaderControl> controlCapture = createCapture();
+ leaderControlCaptures.add(controlCapture);
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ listener.onLeading(capture(controlCapture));
+ InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
+ Map<String, InetSocketAddress> aux =
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
+ expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+ }
+
+ control.replay();
+
+ for (int i = 0; i < 5; i++) {
+ final String leaderName = "foo" + i;
+ newLeader(leaderName, leaderCaptures.get(i));
+ leaderControlCaptures.get(i).getValue().advertise();
+ leaderControlCaptures.get(i).getValue().leave();
+ }
+ }
+
+ @Test
+ public void testLeaderLeaves() throws Exception {
+ control.replay();
+ shutdownNetwork();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java
deleted file mode 100644
index 454ae22..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
-import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
-import org.easymock.Capture;
-import org.easymock.IExpectationSetters;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.fail;
-
-public class SingletonServiceTest extends BaseZooKeeperTest {
- private static final int PORT_A = 1234;
- private static final int PORT_B = 8080;
- private static final InetSocketAddress PRIMARY_ENDPOINT =
- InetSocketAddress.createUnresolved("foo", PORT_A);
- private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
- ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
-
- private IMocksControl control;
- private SingletonService.LeadershipListener listener;
- private ServerSet serverSet;
- private ServerSet.EndpointStatus endpointStatus;
- private Candidate candidate;
- private ExceptionalCommand<Group.JoinException> abdicate;
-
- private SingletonService service;
-
- @Before
- @SuppressWarnings("unchecked")
- public void mySetUp() throws IOException {
- control = createControl();
- addTearDown(control::verify);
- listener = control.createMock(SingletonService.LeadershipListener.class);
- serverSet = control.createMock(ServerSet.class);
- candidate = control.createMock(Candidate.class);
- endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
- abdicate = control.createMock(ExceptionalCommand.class);
-
- service = new SingletonService(serverSet, candidate);
- }
-
- private void newLeader(
- final String hostName,
- Capture<Leader> leader,
- LeadershipListener listener) throws Exception {
-
- service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
- ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
- listener);
-
- // This actually elects the leader.
- leader.getValue().onElected(abdicate);
- }
-
- private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
- newLeader(hostName, leader, listener);
- }
-
- private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
- return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
- }
-
- @Test
- public void testLeadAdvertise() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- expectJoin().andReturn(endpointStatus);
- endpointStatus.leave();
- abdicate.execute();
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().advertise();
- controlCapture.getValue().leave();
- }
-
- @Test
- public void teatLeadLeaveNoAdvertise() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- abdicate.execute();
-
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().leave();
- }
-
- @Test
- public void testLeadJoinFailure() throws Exception {
- Capture<Leader> leaderCapture = new Capture<Leader>();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
- abdicate.execute();
-
- control.replay();
-
- newLeader("foo", leaderCapture);
-
- try {
- controlCapture.getValue().advertise();
- fail("Join should have failed.");
- } catch (JoinException e) {
- // Expected.
- }
-
- controlCapture.getValue().leave();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testMultipleAdvertise() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- expectJoin().andReturn(endpointStatus);
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().advertise();
- controlCapture.getValue().advertise();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testMultipleLeave() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- expectJoin().andReturn(endpointStatus);
- endpointStatus.leave();
- abdicate.execute();
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().advertise();
- controlCapture.getValue().leave();
- controlCapture.getValue().leave();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testAdvertiseAfterLeave() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- abdicate.execute();
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().leave();
- controlCapture.getValue().advertise();
- }
-
- @Test
- public void testLeadMulti() throws Exception {
- List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
- List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
-
- for (int i = 0; i < 5; i++) {
- Capture<Leader> leaderCapture = new Capture<Leader>();
- leaderCaptures.add(leaderCapture);
- Capture<LeaderControl> controlCapture = createCapture();
- leaderControlCaptures.add(controlCapture);
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- listener.onLeading(capture(controlCapture));
- InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
- Map<String, InetSocketAddress> aux =
- ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
- expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
- endpointStatus.leave();
- abdicate.execute();
- }
-
- control.replay();
-
- for (int i = 0; i < 5; i++) {
- final String leaderName = "foo" + i;
- newLeader(leaderName, leaderCaptures.get(i));
- leaderControlCaptures.get(i).getValue().advertise();
- leaderControlCaptures.get(i).getValue().leave();
- }
- }
-
- @Test
- public void testLeaderLeaves() throws Exception {
- control.replay();
- shutdownNetwork();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index b15540c..debe899 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Qualifier;
@@ -48,8 +47,7 @@ import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.StateMachine;
import org.apache.aurora.common.util.StateMachine.Transition;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.SingletonService;
import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -83,7 +81,7 @@ import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipList
* {@link java.lang.IllegalStateException}.
* <p>
* At any point in the lifecycle, the scheduler will respond to
- * {@link LeadershipListener#onDefeated(ServerSet.EndpointStatus)
+ * {@link LeadershipListener#onDefeated()
* onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}.
* A clean shutdown will also be initiated if control actions fail during normal state transitions.
*/
@@ -274,7 +272,7 @@ public class SchedulerLifecycle implements EventSubscriber {
schedulerActiveServiceManager.startAsync().awaitHealthy();
try {
leaderControl.get().advertise();
- } catch (JoinException | InterruptedException e) {
+ } catch (SingletonService.AdvertiseException | InterruptedException e) {
LOG.error("Failed to advertise leader, shutting down.");
throw Throwables.propagate(e);
}
@@ -297,10 +295,8 @@ public class SchedulerLifecycle implements EventSubscriber {
if (control != null) {
try {
control.leave();
- } catch (JoinException e) {
+ } catch (SingletonService.LeaveException e) {
LOG.warn("Failed to leave leadership: " + e, e);
- } catch (ServerSet.UpdateException e) {
- LOG.warn("Failed to leave server set: " + e, e);
}
}
@@ -395,7 +391,7 @@ public class SchedulerLifecycle implements EventSubscriber {
}
@Override
- public void onDefeated(@Nullable ServerSet.EndpointStatus status) {
+ public void onDefeated() {
LOG.error("Lost leadership, committing suicide.");
stateMachine.transition(State.DEAD);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 60b66e8..11f6ad1 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -42,7 +42,6 @@ import org.apache.aurora.common.args.constraints.NotEmpty;
import org.apache.aurora.common.args.constraints.NotNull;
import org.apache.aurora.common.inject.Bindings;
import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.common.zookeeper.Group;
import org.apache.aurora.common.zookeeper.SingletonService;
import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
import org.apache.aurora.gen.ServerInfo;
@@ -120,10 +119,8 @@ public class SchedulerMain {
httpSocketAddress,
ImmutableMap.of("http", httpSocketAddress),
leaderListener);
- } catch (Group.WatchException e) {
- throw new IllegalStateException("Failed to watch group and lead service.", e);
- } catch (Group.JoinException e) {
- throw new IllegalStateException("Failed to join scheduler service group.", e);
+ } catch (SingletonService.LeadException e) {
+ throw new IllegalStateException("Failed to lead service.", e);
} catch (InterruptedException e) {
throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
index 97977fd..240164f 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
@@ -25,6 +25,7 @@ import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.common.zookeeper.ServerSet;
import org.apache.aurora.common.zookeeper.ServerSetImpl;
import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
import org.apache.aurora.common.zookeeper.ZooKeeperClient;
import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials;
import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
@@ -85,8 +86,8 @@ public class ServiceDiscoveryModule extends AbstractModule {
ServerSet serverSet,
List<ACL> zookeeperAcls) {
- return new SingletonService(
+ return new SingletonServiceImpl(
serverSet,
- SingletonService.createSingletonCandidate(client, serverSetPath, zookeeperAcls));
+ SingletonServiceImpl.createSingletonCandidate(client, serverSetPath, zookeeperAcls));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f729fd4d/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index e225ae5..051c520 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -179,7 +179,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
LeadershipListener leaderListener = schedulerLifecycle.prepare();
leaderListener.onLeading(leaderControl);
- leaderListener.onDefeated(null);
+ leaderListener.onDefeated();
}
@Test