You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/16 23:54:15 UTC
git commit: Rewrite SchedulerLifecycle, employing a state machine.
Updated Branches:
refs/heads/master 8bb538f13 -> ad999b9f3
Rewrite SchedulerLifecycle, employing a state machine.
This change addresses two issues:
- Ensure leadership is canceled whenever onDefeated is called.
- Scheduler should wait for registered to be called before attempting to
invoke driver.
Some additional structural changes were made:
- Driver.run() is no longer used. Instead, we invoke Driver.start()
(non-blocking), and the lifecycle uses Driver.join() to await exit.
This allows us to avoid jumping through thread-safety hoops in unit tests.
- A shim interface (DelayedActions) was added to SchedulerLifecycle to make
testing easier when capturing delayed closures.
Testing Done:
gradle clean build
Reviewed at https://reviews.apache.org/r/16054/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ad999b9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ad999b9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ad999b9f
Branch: refs/heads/master
Commit: ad999b9f31a72c830bbedb1be1d6ebf37e570b5c
Parents: 8bb538f
Author: Bill Farner <wf...@apache.org>
Authored: Mon Dec 16 14:53:31 2013 -0800
Committer: Bill Farner <bi...@twitter.com>
Committed: Mon Dec 16 14:53:31 2013 -0800
----------------------------------------------------------------------
.../com/twitter/aurora/scheduler/Driver.java | 21 +-
.../aurora/scheduler/SchedulerLifecycle.java | 493 ++++++++++++-------
.../aurora/scheduler/SchedulerModule.java | 29 +-
.../aurora/scheduler/app/SchedulerMain.java | 15 +-
.../storage/testing/StorageTestUtil.java | 6 +-
.../twitter/aurora/scheduler/DriverTest.java | 18 +-
.../scheduler/SchedulerLifecycleTest.java | 139 ++++++
.../aurora/scheduler/app/SchedulerIT.java | 16 +-
8 files changed, 527 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/Driver.java b/src/main/java/com/twitter/aurora/scheduler/Driver.java
index e8fe170..aa77887 100644
--- a/src/main/java/com/twitter/aurora/scheduler/Driver.java
+++ b/src/main/java/com/twitter/aurora/scheduler/Driver.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Status;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.SchedulerDriver;
@@ -72,11 +73,18 @@ public interface Driver {
void stop();
/**
- * Runs the underlying driver. Can only be called once.
+ * Starts the underlying driver. Can only be called once.
*
* @return The status of the underlying driver run request.
*/
- Protos.Status run();
+ Protos.Status start();
+
+ /**
+ * Blocks until the underlying driver is stopped or aborted.
+ *
+ * @return The status of the underlying driver upon exit.
+ */
+ Protos.Status join();
/**
* Mesos driver implementation.
@@ -134,10 +142,15 @@ public interface Driver {
}
@Override
- public Protos.Status run() {
+ public Protos.Status start() {
SchedulerDriver driver = get(State.INIT);
stateMachine.transition(State.RUNNING);
- return driver.run();
+ return driver.start();
+ }
+
+ @Override
+ public Status join() {
+ return get(State.RUNNING).join();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
index 346d52a..f90869d 100644
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
@@ -15,21 +15,23 @@
*/
package com.twitter.aurora.scheduler;
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.inject.Inject;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.BindingAnnotation;
+import com.google.common.util.concurrent.Atomics;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
@@ -43,93 +45,283 @@ import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
import com.twitter.aurora.scheduler.storage.Storage.Work;
import com.twitter.aurora.scheduler.storage.StorageBackfill;
import com.twitter.common.application.Lifecycle;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
import com.twitter.common.stats.Stats;
import com.twitter.common.util.Clock;
-import com.twitter.common.zookeeper.Group;
+import com.twitter.common.util.StateMachine;
+import com.twitter.common.util.StateMachine.Transition;
+import com.twitter.common.zookeeper.Group.JoinException;
import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.common.zookeeper.SingletonService;
import com.twitter.common.zookeeper.SingletonService.LeaderControl;
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+
/**
* The central driver of the scheduler runtime lifecycle. Handles the transitions from startup and
* initialization through acting as a standby scheduler / log replica and finally to becoming the
* scheduler leader.
- *
- * <p>TODO(John Sirois): This class contains the old logic of SchedulerMain - now that its extracted
- * it should be tested.
+ * <p>
+ * The (enforced) call order to be used with this class is:
+ * <ol>
+ * <li>{@link #prepare()}, to initialize the storage system.</li>
+ * <li>{@link LeadershipListener#onLeading(LeaderControl) onLeading()} on the
+ * {@link LeadershipListener LeadershipListener}
+ * returned from {@link #prepare()}, signaling that this process has exclusive control of the
+ * cluster.</li>
+ * <li>{@link #registered(DriverRegistered) registered()},
+ * indicating that registration with the mesos master has succeeded.
+ * At this point, the scheduler's presence will be announced via
+ * {@link LeaderControl#advertise() advertise()}.</li>
+ * </ol>
+ * If this call order is broken, calls will fail by throwing
+ * {@link java.lang.IllegalStateException}.
+ * <p>
+ * At any point in the lifecycle, the scheduler will respond to
+ * {@link LeadershipListener#onDefeated(com.twitter.common.zookeeper.ServerSet.EndpointStatus)
+ * onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}.
+ * A clean shutdown will also be initiated if control actions fail during normal state transitions.
*/
public class SchedulerLifecycle implements EventSubscriber {
- /**
- * A {@link SingletonService} scheduler leader candidate that exposes a method for awaiting clean
- * shutdown.
- */
- public interface SchedulerCandidate extends SingletonService.LeadershipListener {
- /**
- * Waits for this candidate to abdicate or otherwise decide to quit.
- */
- void awaitShutdown();
+ private static final Logger LOG = Logger.getLogger(SchedulerLifecycle.class.getName());
+
+ private enum State {
+ IDLE,
+ PREPARING_STORAGE,
+ STORAGE_PREPARED,
+ LEADER_AWAITING_REGISTRATION,
+ REGISTERED_LEADER,
+ RUNNING,
+ DEAD
}
- @CmdLine(name = "max_registration_delay",
- help = "Max allowable delay to allow the driver to register before aborting")
- private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
- Arg.create(Amount.of(1L, Time.MINUTES));
+ private static final Predicate<Transition<State>> IS_DEAD = new Predicate<Transition<State>>() {
+ @Override public boolean apply(Transition<State> state) {
+ return state.getTo() == State.DEAD;
+ }
+ };
- @CmdLine(name = "max_leading_duration",
- help = "After leading for this duration, the scheduler should commit suicide.")
- private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
- Arg.create(Amount.of(1L, Time.DAYS));
+ private static final Predicate<Transition<State>> NOT_DEAD = Predicates.not(IS_DEAD);
- /**
- * Binding annotation to attach to the flag indicating whether to initiate application shutdown
- * when the driver returns from {@link Driver#run()}.
- */
- @BindingAnnotation
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- public @interface ShutdownOnDriverExit { }
+ private final LeadershipListener leadershipListener;
+ private final AtomicBoolean registrationAcked = new AtomicBoolean(false);
+ private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
+ private final StateMachine<State> stateMachine;
- private static final Logger LOG = Logger.getLogger(SchedulerLifecycle.class.getName());
+ @Inject
+ SchedulerLifecycle(
+ final DriverFactory driverFactory,
+ final NonVolatileStorage storage,
+ final Lifecycle lifecycle,
+ final Driver driver,
+ final DriverReference driverRef,
+ final LeadingOptions leadingOptions,
+ final ScheduledExecutorService executorService,
+ final Clock clock) {
+
+ this(
+ driverFactory,
+ storage,
+ lifecycle,
+ driver,
+ driverRef,
+ new DelayedActions() {
+ @Override public void blockingDriverJoin(Runnable runnable) {
+ executorService.execute(runnable);
+ }
- private final DriverFactory driverFactory;
- private final NonVolatileStorage storage;
- private final Lifecycle lifecycle;
- private final CountDownLatch registeredLatch = new CountDownLatch(1);
- private final AtomicInteger registeredFlag = Stats.exportInt("framework_registered");
+ @Override public void onAutoFailover(Runnable runnable) {
+ executorService.schedule(
+ runnable,
+ leadingOptions.leadingTimeLimit.getValue(),
+ leadingOptions.leadingTimeLimit.getUnit().getTimeUnit());
+ }
- private final Driver driver;
- private final DriverReference driverRef;
- private final boolean shutdownAfterRunning;
- private final Clock clock;
+ @Override public void onRegistrationTimeout(Runnable runnable) {
+ LOG.info(
+ "Giving up on registration in " + leadingOptions.registrationDelayLimit);
+ executorService.schedule(
+ runnable,
+ leadingOptions.registrationDelayLimit.getValue(),
+ leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
+ }
+ },
+ clock);
+ }
- @Inject
+ @VisibleForTesting
SchedulerLifecycle(
- DriverFactory driverFactory,
- NonVolatileStorage storage,
- Lifecycle lifecycle,
- Driver driver,
- DriverReference driverRef,
- @ShutdownOnDriverExit boolean shutdownAfterRunning,
- Clock clock) {
-
- this.driverFactory = checkNotNull(driverFactory);
- this.storage = checkNotNull(storage);
- this.lifecycle = checkNotNull(lifecycle);
- this.driver = checkNotNull(driver);
- this.driverRef = checkNotNull(driverRef);
- this.shutdownAfterRunning = shutdownAfterRunning;
- this.clock = checkNotNull(clock);
+ final DriverFactory driverFactory,
+ final NonVolatileStorage storage,
+ final Lifecycle lifecycle,
+ // TODO(wfarner): The presence of Driver and DriverReference is quite confusing. Figure out
+ // a clean way to collapse the duties of DriverReference into DriverImpl.
+ final Driver driver,
+ final DriverReference driverRef,
+ final DelayedActions delayedActions,
+ final Clock clock) {
+
+ Stats.export(new StatImpl<Integer>("framework_registered") {
+ @Override public Integer read() {
+ return registrationAcked.get() ? 1 : 0;
+ }
+ });
+ for (final State state : State.values()) {
+ Stats.export(new StatImpl<Integer>("scheduler_lifecycle_" + state) {
+ @Override public Integer read() {
+ return (state == stateMachine.getState()) ? 1 : 0;
+ }
+ });
+ }
+
+ final Closure<Transition<State>> prepareStorage = new Closure<Transition<State>>() {
+ @Override public void execute(Transition<State> transition) {
+ try {
+ storage.prepare();
+ stateMachine.transition(State.STORAGE_PREPARED);
+ } catch (RuntimeException e) {
+ stateMachine.transition(State.DEAD);
+ throw e;
+ }
+ }
+ };
+
+ final Closure<Transition<State>> handleLeading = new Closure<Transition<State>>() {
+ @Override public void execute(Transition<State> transition) {
+ LOG.info("Elected as leading scheduler!");
+ storage.start(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ StorageBackfill.backfill(storeProvider, clock);
+ }
+ });
+
+ @Nullable final String frameworkId = storage.consistentRead(
+ new Work.Quiet<String>() {
+ @Override public String apply(StoreProvider storeProvider) {
+ return storeProvider.getSchedulerStore().fetchFrameworkId();
+ }
+ });
+ driverRef.set(driverFactory.apply(frameworkId));
+
+ delayedActions.onRegistrationTimeout(
+ new Runnable() {
+ @Override public void run() {
+ if (!registrationAcked.get()) {
+ LOG.severe(
+ "Framework has not been registered within the tolerated delay.");
+ stateMachine.transition(State.DEAD);
+ }
+ }
+ });
+
+ delayedActions.onAutoFailover(
+ new Runnable() {
+ @Override public void run() {
+ LOG.info("Triggering automatic failover.");
+ stateMachine.transition(State.DEAD);
+ }
+ });
+
+ Protos.Status status = driver.start();
+ LOG.info("Driver started with code " + status);
+ delayedActions.blockingDriverJoin(new Runnable() {
+ @Override public void run() {
+ // Blocks until driver exits.
+ driver.join();
+ stateMachine.transition(State.DEAD);
+ }
+ });
+ }
+ };
+
+ final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
+ @Override public void execute(Transition<State> transition) {
+ registrationAcked.set(true);
+ try {
+ leaderControl.get().advertise();
+ } catch (JoinException e) {
+ LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
+ stateMachine.transition(State.DEAD);
+ } catch (InterruptedException e) {
+ LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.", e);
+ stateMachine.transition(State.DEAD);
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+
+ final Closure<Transition<State>> shutDown = new Closure<Transition<State>>() {
+ private final AtomicBoolean invoked = new AtomicBoolean(false);
+ @Override public void execute(Transition<State> transition) {
+ if (!invoked.compareAndSet(false, true)) {
+ LOG.info("Shutdown already invoked, ignoring extra call.");
+ return;
+ }
+
+ // TODO(wfarner): Consider using something like guava's Closer to abstractly tear down
+ // resources here.
+ try {
+ LeaderControl control = leaderControl.get();
+ if (control != null) {
+ try {
+ control.leave();
+ } catch (JoinException e) {
+ LOG.log(Level.WARNING, "Failed to leave leadership: " + e, e);
+ } catch (ServerSet.UpdateException e) {
+ LOG.log(Level.WARNING, "Failed to leave server set: " + e, e);
+ }
+ }
+
+ // TODO(wfarner): Re-evaluate tear-down ordering here. Should the top-level shutdown
+ // be invoked first, or the underlying critical components?
+ driver.stop();
+ storage.stop();
+ } finally {
+ lifecycle.shutdown();
+ }
+ }
+ };
+
+ stateMachine = StateMachine.<State>builder("SchedulerLifecycle")
+ .initialState(State.IDLE)
+ .logTransitions()
+ .addState(
+ Closures.filter(NOT_DEAD, prepareStorage),
+ State.IDLE,
+ State.PREPARING_STORAGE, State.DEAD)
+ .addState(
+ State.PREPARING_STORAGE,
+ State.STORAGE_PREPARED, State.DEAD)
+ .addState(
+ Closures.filter(NOT_DEAD, handleLeading),
+ State.STORAGE_PREPARED,
+ State.LEADER_AWAITING_REGISTRATION, State.DEAD)
+ .addState(
+ Closures.filter(NOT_DEAD, handleRegistered),
+ State.LEADER_AWAITING_REGISTRATION,
+ State.REGISTERED_LEADER, State.DEAD)
+ .addState(
+ State.REGISTERED_LEADER,
+ State.RUNNING, State.DEAD)
+ .addState(
+ State.RUNNING,
+ State.DEAD)
+ .addState(
+ State.DEAD,
+ // Allow cycles in DEAD to prevent throwing and avoid the need for call-site checking.
+ State.DEAD
+ )
+ .onAnyTransition(
+ Closures.filter(IS_DEAD, shutDown))
+ .build();
+
+ this.leadershipListener = new SchedulerCandidateImpl(stateMachine, leaderControl);
}
/**
@@ -137,144 +329,89 @@ public class SchedulerLifecycle implements EventSubscriber {
* host a live log replica and start syncing data from the leader via the log until it gets called
* upon to lead.
*
- * @return A candidate that can be offered for leadership of a distributed election.
+ * @return A listener that can be offered for leadership of a distributed election.
*/
- public SchedulerCandidate prepare() {
- LOG.info("Preparing storage");
- storage.prepare();
- return new SchedulerCandidateImpl();
+ public LeadershipListener prepare() {
+ stateMachine.transition(State.PREPARING_STORAGE);
+ return leadershipListener;
}
@Subscribe
public void registered(DriverRegistered event) {
- registeredLatch.countDown();
- registeredFlag.set(1);
+ stateMachine.transition(State.REGISTERED_LEADER);
}
/**
* Maintains a reference to the driver.
*/
static class DriverReference implements Supplier<Optional<SchedulerDriver>> {
- private volatile Optional<SchedulerDriver> driver = Optional.absent();
+ private final AtomicReference<SchedulerDriver> driver = Atomics.newReference();
@Override public Optional<SchedulerDriver> get() {
- return driver;
+ return Optional.fromNullable(driver.get());
}
private void set(SchedulerDriver ref) {
- this.driver = Optional.of(ref);
+ driver.set(ref);
}
}
- /**
- * Implementation of the scheduler candidate lifecycle.
- */
- private class SchedulerCandidateImpl implements SchedulerCandidate {
- @Override public void onLeading(LeaderControl control) {
- LOG.info("Elected as leading scheduler!");
- try {
- lead();
- control.advertise();
- } catch (Group.JoinException e) {
- LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
- lifecycle.shutdown();
- } catch (InterruptedException e) {
- LOG.log(Level.SEVERE, "Failed to update endpoint status, shutting down.", e);
- lifecycle.shutdown();
- Thread.currentThread().interrupt();
- } catch (RuntimeException e) {
- LOG.log(Level.SEVERE, "Unexpected exception attempting to lead, shutting down.", e);
- lifecycle.shutdown();
- }
- }
+ private static class SchedulerCandidateImpl implements LeadershipListener {
+ private final StateMachine<State> stateMachine;
+ private final AtomicReference<LeaderControl> leaderControl;
- private void startDaemonThread(String name, Runnable work) {
- new ThreadFactoryBuilder()
- .setNameFormat(name + "-%d")
- .setDaemon(true)
- .build()
- .newThread(work)
- .start();
- }
+ SchedulerCandidateImpl(
+ StateMachine<State> stateMachine,
+ AtomicReference<LeaderControl> leaderControl) {
- private void lead() {
- storage.start(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- StorageBackfill.backfill(storeProvider, clock);
- }
- });
- @Nullable final String frameworkId = storage.consistentRead(new Work.Quiet<String>() {
- @Override public String apply(StoreProvider storeProvider) {
- return storeProvider.getSchedulerStore().fetchFrameworkId();
- }
- });
-
- driverRef.set(driverFactory.apply(frameworkId));
-
- startDaemonThread("Driver-Runner", new Runnable() {
- @Override public void run() {
- Protos.Status status = driver.run();
- LOG.info("Driver completed with exit code " + status);
- if (shutdownAfterRunning) {
- lifecycle.shutdown();
- }
- }
- });
-
- startDaemonThread("Driver-Register-Watchdog", new Runnable() {
- @Override public void run() {
- LOG.info(String.format("Waiting up to %s for scheduler registration.",
- MAX_REGISTRATION_DELAY.get()));
-
- try {
- boolean registered = registeredLatch.await(
- MAX_REGISTRATION_DELAY.get().getValue(),
- MAX_REGISTRATION_DELAY.get().getUnit().getTimeUnit());
- if (!registered) {
- LOG.severe("Framework has not been registered within the tolerated delay, quitting.");
- lifecycle.shutdown();
- }
- } catch (InterruptedException e) {
- LOG.log(Level.WARNING, "Delayed registration check interrupted.", e);
- Thread.currentThread().interrupt();
- }
- }
- });
+ this.stateMachine = stateMachine;
+ this.leaderControl = leaderControl;
+ }
- startDaemonThread("Leader-Assassin", new Runnable() {
- @Override public void run() {
- try {
- Thread.sleep(MAX_LEADING_DURATION.get().as(Time.MILLISECONDS));
- LOG.info(
- "Leader has been active for " + MAX_LEADING_DURATION.get() + ", forcing failover.");
- onDefeated(null);
- } catch (InterruptedException e) {
- LOG.warning("Leader assassin thread interrupted.");
- Thread.currentThread().interrupt();
- }
- }
- });
+ @Override public void onLeading(LeaderControl control) {
+ leaderControl.set(control);
+ stateMachine.transition(State.LEADER_AWAITING_REGISTRATION);
}
@Override public void onDefeated(@Nullable ServerSet.EndpointStatus status) {
- LOG.info("Lost leadership, committing suicide.");
+ LOG.severe("Lost leadership, committing suicide.");
+ stateMachine.transition(State.DEAD);
+ }
+ }
- try {
- if (status != null) {
- status.leave();
- }
+ public static class LeadingOptions {
+ private final Amount<Long, Time> registrationDelayLimit;
+ private final Amount<Long, Time> leadingTimeLimit;
- driver.stop(); // shut down incoming offers
- storage.stop();
- } catch (ServerSet.UpdateException e) {
- LOG.log(Level.WARNING, "Failed to leave server set.", e);
- } finally {
- lifecycle.shutdown(); // shut down the server
- }
+ /**
+ * Creates a new collection of options for tuning leadership behavior.
+ *
+ * @param registrationDelayLimit Maximum amount of time to wait for framework registration to
+ * complete.
+ * @param leadingTimeLimit Maximum amount of time to serve as leader before abdicating.
+ */
+ public LeadingOptions(
+ Amount<Long, Time> registrationDelayLimit,
+ Amount<Long, Time> leadingTimeLimit) {
+
+ Preconditions.checkArgument(
+ registrationDelayLimit.getValue() >= 0,
+ "Registration delay limit must be positive.");
+ Preconditions.checkArgument(
+ leadingTimeLimit.getValue() >= 0,
+ "Leading time limit must be positive.");
+
+ this.registrationDelayLimit = checkNotNull(registrationDelayLimit);
+ this.leadingTimeLimit = checkNotNull(leadingTimeLimit);
}
+ }
- @Override public void awaitShutdown() {
- lifecycle.awaitShutdown();
- }
+ @VisibleForTesting
+ interface DelayedActions {
+ void blockingDriverJoin(Runnable runnable);
+
+ void onAutoFailover(Runnable runnable);
+
+ void onRegistrationTimeout(Runnable runnable);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
index bd7929d..44aeadc 100644
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
@@ -16,13 +16,17 @@
package com.twitter.aurora.scheduler;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Singleton;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
@@ -32,6 +36,7 @@ import org.apache.mesos.SchedulerDriver;
import com.twitter.aurora.scheduler.Driver.DriverImpl;
import com.twitter.aurora.scheduler.PulseMonitor.PulseMonitorImpl;
import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import com.twitter.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
import com.twitter.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
import com.twitter.aurora.scheduler.events.PubsubEventModule;
import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher;
@@ -54,6 +59,16 @@ public class SchedulerModule extends AbstractModule {
@CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
+ @CmdLine(name = "max_registration_delay",
+ help = "Max allowable delay to allow the driver to register before aborting")
+ private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
+ Arg.create(Amount.of(1L, Time.MINUTES));
+
+ @CmdLine(name = "max_leading_duration",
+ help = "After leading for this duration, the scheduler should commit suicide.")
+ private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
+ Arg.create(Amount.of(1L, Time.DAYS));
+
@Override
protected void configure() {
bind(Driver.class).to(DriverImpl.class);
@@ -75,7 +90,19 @@ public class SchedulerModule extends AbstractModule {
bind(GcExecutorLauncher.class).in(Singleton.class);
bind(UserTaskLauncher.class).in(Singleton.class);
- bind(SchedulerLifecycle.class).in(Singleton.class);
+ install(new PrivateModule() {
+ @Override protected void configure() {
+ bind(LeadingOptions.class).toInstance(
+ new LeadingOptions(MAX_REGISTRATION_DELAY.get(), MAX_LEADING_DURATION.get()));
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder().setNameFormat("Lifecycle-%d").setDaemon(true).build());
+ bind(ScheduledExecutorService.class).toInstance(executor);
+ bind(SchedulerLifecycle.class).in(Singleton.class);
+ expose(SchedulerLifecycle.class);
+ }
+ });
+
PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java b/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
index 22f7217..693c364 100644
--- a/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
@@ -40,7 +40,6 @@ import com.twitter.aurora.scheduler.DriverFactory;
import com.twitter.aurora.scheduler.DriverFactory.DriverFactoryImpl;
import com.twitter.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
import com.twitter.aurora.scheduler.SchedulerLifecycle;
-import com.twitter.aurora.scheduler.SchedulerLifecycle.ShutdownOnDriverExit;
import com.twitter.aurora.scheduler.cron.CronPredictor;
import com.twitter.aurora.scheduler.cron.CronScheduler;
import com.twitter.aurora.scheduler.cron.noop.NoopCronModule;
@@ -56,6 +55,7 @@ import com.twitter.aurora.scheduler.thrift.ThriftModule;
import com.twitter.aurora.scheduler.thrift.auth.ThriftAuthModule;
import com.twitter.common.application.AbstractApplication;
import com.twitter.common.application.AppLauncher;
+import com.twitter.common.application.Lifecycle;
import com.twitter.common.application.modules.HttpModule;
import com.twitter.common.application.modules.LocalServiceRegistry;
import com.twitter.common.application.modules.LogModule;
@@ -68,6 +68,7 @@ import com.twitter.common.inject.Bindings;
import com.twitter.common.logging.RootLogConfig;
import com.twitter.common.zookeeper.Group;
import com.twitter.common.zookeeper.SingletonService;
+import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule;
import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
@@ -133,6 +134,7 @@ public class SchedulerMain extends AbstractApplication {
@Inject private SingletonService schedulerService;
@Inject private LocalServiceRegistry serviceRegistry;
@Inject private SchedulerLifecycle schedulerLifecycle;
+ @Inject private Lifecycle appLifecycle;
@Inject private Optional<RootLogConfig.Configuration> glogConfig;
private static Iterable<? extends Module> getSystemModules() {
@@ -183,7 +185,6 @@ public class SchedulerMain extends AbstractApplication {
@Override protected void configure() {
bind(DriverFactory.class).to(DriverFactoryImpl.class);
bind(DriverFactoryImpl.class).in(Singleton.class);
- bind(Boolean.class).annotatedWith(ShutdownOnDriverExit.class).toInstance(true);
install(new MesosLogStreamModule(zkClientConfig));
}
};
@@ -205,7 +206,6 @@ public class SchedulerMain extends AbstractApplication {
}
});
bind(ExecutorConfig.class).toInstance(new ExecutorConfig(THERMOS_EXECUTOR_PATH.get()));
- bind(Boolean.class).annotatedWith(ShutdownOnDriverExit.class).toInstance(true);
}
};
@@ -227,7 +227,7 @@ public class SchedulerMain extends AbstractApplication {
LOG.warning("Running without expected glog configuration.");
}
- SchedulerLifecycle.SchedulerCandidate candidate = schedulerLifecycle.prepare();
+ LeadershipListener leaderListener = schedulerLifecycle.prepare();
Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
if (!primarySocket.isPresent()) {
@@ -235,7 +235,10 @@ public class SchedulerMain extends AbstractApplication {
}
try {
- schedulerService.lead(primarySocket.get(), serviceRegistry.getAuxiliarySockets(), candidate);
+ schedulerService.lead(
+ primarySocket.get(),
+ serviceRegistry.getAuxiliarySockets(),
+ leaderListener);
} catch (Group.WatchException e) {
throw new IllegalStateException("Failed to watch group and lead service.", e);
} catch (Group.JoinException e) {
@@ -244,7 +247,7 @@ public class SchedulerMain extends AbstractApplication {
throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
}
- candidate.awaitShutdown();
+ appLifecycle.awaitShutdown();
}
public static void main(String[] args) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
index ceef9d3..d735828 100644
--- a/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -27,9 +27,9 @@ import com.twitter.aurora.scheduler.storage.JobStore;
import com.twitter.aurora.scheduler.storage.LockStore;
import com.twitter.aurora.scheduler.storage.QuotaStore;
import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.Storage;
import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
import com.twitter.aurora.scheduler.storage.Storage.Work;
import com.twitter.aurora.scheduler.storage.TaskStore;
@@ -54,7 +54,7 @@ public class StorageTestUtil {
public final JobStore.Mutable jobStore;
public final LockStore.Mutable lockStore;
public final SchedulerStore.Mutable schedulerStore;
- public final Storage storage;
+ public final NonVolatileStorage storage;
/**
* Creates a new storage test utility.
@@ -70,7 +70,7 @@ public class StorageTestUtil {
this.jobStore = easyMock.createMock(JobStore.Mutable.class);
this.lockStore = easyMock.createMock(LockStore.Mutable.class);
this.schedulerStore = easyMock.createMock(SchedulerStore.Mutable.class);
- this.storage = easyMock.createMock(Storage.class);
+ this.storage = easyMock.createMock(NonVolatileStorage.class);
}
private <T> IExpectationSetters<T> expectConsistentRead() {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/test/java/com/twitter/aurora/scheduler/DriverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/DriverTest.java b/src/test/java/com/twitter/aurora/scheduler/DriverTest.java
index 5609b0b..f6f7c23 100644
--- a/src/test/java/com/twitter/aurora/scheduler/DriverTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/DriverTest.java
@@ -62,11 +62,11 @@ public class DriverTest extends EasyMockTest {
@Test
public void testMultipleStops() {
expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(2);
- expect(schedulerDriver.run()).andReturn(DRIVER_RUNNING);
+ expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
control.replay();
- assertEquals(DRIVER_RUNNING, driver.run());
+ assertEquals(DRIVER_RUNNING, driver.start());
driver.stop();
driver.stop();
}
@@ -74,24 +74,24 @@ public class DriverTest extends EasyMockTest {
@Test
public void testStop() {
expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(2);
- expect(schedulerDriver.run()).andReturn(DRIVER_RUNNING);
+ expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
control.replay();
- assertEquals(DRIVER_RUNNING, driver.run());
+ assertEquals(DRIVER_RUNNING, driver.start());
driver.stop();
}
@Test
public void testNormalLifecycle() {
expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(4);
- expect(schedulerDriver.run()).andReturn(DRIVER_RUNNING);
+ expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.killTask(createTaskId(TASK_1))).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.killTask(createTaskId(TASK_2))).andReturn(DRIVER_RUNNING);
expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
control.replay();
- assertEquals(DRIVER_RUNNING, driver.run());
+ assertEquals(DRIVER_RUNNING, driver.start());
driver.killTask(TASK_1);
driver.killTask(TASK_2);
driver.stop();
@@ -107,10 +107,10 @@ public class DriverTest extends EasyMockTest {
@Test(expected = IllegalStateException.class)
public void testOnlyOneRunAllowed() {
expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver));
- expect(schedulerDriver.run()).andReturn(DRIVER_RUNNING);
+ expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
control.replay();
- assertEquals(DRIVER_RUNNING, driver.run());
- driver.run();
+ assertEquals(DRIVER_RUNNING, driver.start());
+ driver.start();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/test/java/com/twitter/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/com/twitter/aurora/scheduler/SchedulerLifecycleTest.java
new file mode 100644
index 0000000..3336875
--- /dev/null
+++ b/src/test/java/com/twitter/aurora/scheduler/SchedulerLifecycleTest.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.SchedulerDriver;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.aurora.scheduler.SchedulerLifecycle.DelayedActions;
+import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import com.twitter.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import com.twitter.aurora.scheduler.storage.testing.StorageTestUtil;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.base.Command;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+import com.twitter.common.zookeeper.SingletonService.LeaderControl;
+import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+public class SchedulerLifecycleTest extends EasyMockTest {
+
+ private static final String FRAMEWORK_ID = "framework id";
+
+ private DriverFactory driverFactory;
+ private StorageTestUtil storageUtil;
+ private Command shutdownRegistry;
+ private Driver driver;
+ private DriverReference driverRef;
+ private LeaderControl leaderControl;
+ private SchedulerDriver schedulerDriver;
+ private DelayedActions delayedActions;
+
+ private SchedulerLifecycle schedulerLifecycle;
+
+ @Before
+ public void setUp() {
+ driverFactory = createMock(DriverFactory.class);
+ storageUtil = new StorageTestUtil(this);
+ shutdownRegistry = createMock(Command.class);
+ driver = createMock(Driver.class);
+ driverRef = new DriverReference();
+ leaderControl = createMock(LeaderControl.class);
+ schedulerDriver = createMock(SchedulerDriver.class);
+ delayedActions = createMock(DelayedActions.class);
+ schedulerLifecycle = new SchedulerLifecycle(
+ driverFactory,
+ storageUtil.storage,
+ new Lifecycle(shutdownRegistry, new UncaughtExceptionHandler() {
+ @Override public void uncaughtException(Thread t, Throwable e) {
+ fail(e.getMessage());
+ }
+ }),
+ driver,
+ driverRef,
+ delayedActions,
+ createMock(Clock.class));
+ }
+
+ @Test
+ public void testAutoFailover() throws Throwable {
+ // Test that when timed failover is initiated, cleanup is done in a way that should allow the
+ // application to tear down cleanly. Specifically, neglecting to call leaderControl.leave()
+ // can result in a lame duck scheduler process.
+
+ storageUtil.storage.prepare();
+
+ storageUtil.storage.start(EasyMock.<Quiet>anyObject());
+ storageUtil.expectOperations();
+ expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(FRAMEWORK_ID);
+ expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
+ Capture<Runnable> triggerFailoverCapture = createCapture();
+ delayedActions.onAutoFailover(capture(triggerFailoverCapture));
+ delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
+ expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
+ delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
+
+ leaderControl.advertise();
+ leaderControl.leave();
+ driver.stop();
+ storageUtil.storage.stop();
+ shutdownRegistry.execute();
+
+ control.replay();
+
+ LeadershipListener leaderListener = schedulerLifecycle.prepare();
+ leaderListener.onLeading(leaderControl);
+ schedulerLifecycle.registered(new DriverRegistered());
+ triggerFailoverCapture.getValue().run();
+ }
+
+ @Test
+ public void testDefeatedBeforeRegistered() throws Throwable {
+ storageUtil.storage.prepare();
+ storageUtil.storage.start(EasyMock.<Quiet>anyObject());
+ storageUtil.expectOperations();
+ expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(FRAMEWORK_ID);
+ expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
+ delayedActions.onAutoFailover(EasyMock.<Runnable>anyObject());
+ delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
+ expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
+ delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
+
+ // Important piece here is what's absent - leader presence is not advertised.
+ leaderControl.leave();
+ driver.stop();
+ storageUtil.storage.stop();
+ shutdownRegistry.execute();
+
+ control.replay();
+
+ LeadershipListener leaderListener = schedulerLifecycle.prepare();
+ leaderListener.onLeading(leaderControl);
+ leaderListener.onDefeated(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java b/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java
index 4c381b9..1242d46 100644
--- a/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java
@@ -72,7 +72,6 @@ import com.twitter.aurora.gen.storage.Transaction;
import com.twitter.aurora.gen.storage.storageConstants;
import com.twitter.aurora.scheduler.DriverFactory;
import com.twitter.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
-import com.twitter.aurora.scheduler.SchedulerLifecycle.ShutdownOnDriverExit;
import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
import com.twitter.aurora.scheduler.log.Log;
import com.twitter.aurora.scheduler.log.Log.Entry;
@@ -187,7 +186,6 @@ public class SchedulerIT extends BaseZooKeeperTest {
}
);
bind(ExecutorConfig.class).toInstance(new ExecutorConfig("/executor/thermos"));
- bind(Boolean.class).annotatedWith(ShutdownOnDriverExit.class).toInstance(false);
install(new BackupModule(backupDir, SnapshotStoreImpl.class));
}
};
@@ -330,8 +328,12 @@ public class SchedulerIT extends BaseZooKeeperTest {
logStream.close();
expectLastCall().anyTimes();
- expect(driver.run()).andAnswer(new IAnswer<Status>() {
- @Override public Status answer() throws InterruptedException {
+ final AtomicReference<Scheduler> scheduler = Atomics.newReference();
+ expect(driver.start()).andAnswer(new IAnswer<Status>() {
+ @Override public Status answer() {
+ scheduler.get().registered(driver,
+ FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
+ MasterInfo.getDefaultInstance());
return Status.DRIVER_RUNNING;
}
});
@@ -339,13 +341,9 @@ public class SchedulerIT extends BaseZooKeeperTest {
control.replay();
startScheduler();
+ scheduler.set(getScheduler());
awaitSchedulerReady();
- Scheduler scheduler = getScheduler();
- scheduler.registered(driver,
- FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
- MasterInfo.getDefaultInstance());
-
assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());
assertEquals(1L, Stats.<Long>getVariable("task_store_RUNNING").read().longValue());