You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/16 23:54:15 UTC

git commit: Rewrite SchedulerLifecycle, employing a state machine.

Updated Branches:
  refs/heads/master 8bb538f13 -> ad999b9f3


Rewrite SchedulerLifecycle, employing a state machine.

This change addresses two issues:
  - Ensure leadership is canceled whenever onDefeated is called.
  - Scheduler should wait for registered to be called before attempting to
    invoke driver.

Some additional structural changes were made:
  - Driver.run() is no longer used.  Instead, we invoke Driver.start()
    (non-blocking), and the lifecycle uses Driver.join() to await exit.
    This allows us to avoid jumping through thread-safety hoops in unit tests.
  - A shim interface (DelayedActions) was added to SchedulerLifecycle to make
    testing easier when capturing delayed closures.

Testing Done:
gradle clean build

Reviewed at https://reviews.apache.org/r/16054/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ad999b9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ad999b9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ad999b9f

Branch: refs/heads/master
Commit: ad999b9f31a72c830bbedb1be1d6ebf37e570b5c
Parents: 8bb538f
Author: Bill Farner <wf...@apache.org>
Authored: Mon Dec 16 14:53:31 2013 -0800
Committer: Bill Farner <bi...@twitter.com>
Committed: Mon Dec 16 14:53:31 2013 -0800

----------------------------------------------------------------------
 .../com/twitter/aurora/scheduler/Driver.java    |  21 +-
 .../aurora/scheduler/SchedulerLifecycle.java    | 493 ++++++++++++-------
 .../aurora/scheduler/SchedulerModule.java       |  29 +-
 .../aurora/scheduler/app/SchedulerMain.java     |  15 +-
 .../storage/testing/StorageTestUtil.java        |   6 +-
 .../twitter/aurora/scheduler/DriverTest.java    |  18 +-
 .../scheduler/SchedulerLifecycleTest.java       | 139 ++++++
 .../aurora/scheduler/app/SchedulerIT.java       |  16 +-
 8 files changed, 527 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/Driver.java b/src/main/java/com/twitter/aurora/scheduler/Driver.java
index e8fe170..aa77887 100644
--- a/src/main/java/com/twitter/aurora/scheduler/Driver.java
+++ b/src/main/java/com/twitter/aurora/scheduler/Driver.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
 
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Status;
 import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.SchedulerDriver;
 
@@ -72,11 +73,18 @@ public interface Driver {
   void stop();
 
   /**
-   * Runs the underlying driver.  Can only be called once.
+   * Starts the underlying driver.  Can only be called once.
    *
    * @return The status of the underlying driver run request.
    */
-  Protos.Status run();
+  Protos.Status start();
+
+  /**
+   * Blocks until the underlying driver is stopped or aborted.
+   *
+   * @return The status of the underlying driver upon exit.
+   */
+  Protos.Status join();
 
   /**
    * Mesos driver implementation.
@@ -134,10 +142,15 @@ public interface Driver {
     }
 
     @Override
-    public Protos.Status run() {
+    public Protos.Status start() {
       SchedulerDriver driver = get(State.INIT);
       stateMachine.transition(State.RUNNING);
-      return driver.run();
+      return driver.start();
+    }
+
+    @Override
+    public Status join() {
+      return get(State.RUNNING).join();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
index 346d52a..f90869d 100644
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
@@ -15,21 +15,23 @@
  */
 package com.twitter.aurora.scheduler;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
 import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.BindingAnnotation;
+import com.google.common.util.concurrent.Atomics;
 
 import org.apache.mesos.Protos;
 import org.apache.mesos.SchedulerDriver;
@@ -43,93 +45,283 @@ import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
 import com.twitter.aurora.scheduler.storage.Storage.Work;
 import com.twitter.aurora.scheduler.storage.StorageBackfill;
 import com.twitter.common.application.Lifecycle;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
-import com.twitter.common.zookeeper.Group;
+import com.twitter.common.util.StateMachine;
+import com.twitter.common.util.StateMachine.Transition;
+import com.twitter.common.zookeeper.Group.JoinException;
 import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.common.zookeeper.SingletonService;
 import com.twitter.common.zookeeper.SingletonService.LeaderControl;
 
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import static com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+
 /**
  * The central driver of the scheduler runtime lifecycle.  Handles the transitions from startup and
  * initialization through acting as a standby scheduler / log replica and finally to becoming the
  * scheduler leader.
- *
- * <p>TODO(John Sirois): This class contains the old logic of SchedulerMain - now that its extracted
- * it should be tested.
+ * <p>
+ * The (enforced) call order to be used with this class is:
+ * <ol>
+ *   <li>{@link #prepare()}, to initialize the storage system.</li>
+ *   <li>{@link LeadershipListener#onLeading(LeaderControl) onLeading()} on the
+ *       {@link LeadershipListener LeadershipListener}
+ *       returned from {@link #prepare()}, signaling that this process has exclusive control of the
+ *       cluster.</li>
+ *   <li>{@link #registered(DriverRegistered) registered()},
+ *       indicating that registration with the mesos master has succeeded.
+ *       At this point, the scheduler's presence will be announced via
+ *       {@link LeaderControl#advertise() advertise()}.</li>
+ * </ol>
+ * If this call order is broken, calls will fail by throwing
+ * {@link java.lang.IllegalStateException}.
+ * <p>
+ * At any point in the lifecycle, the scheduler will respond to
+ * {@link LeadershipListener#onDefeated(com.twitter.common.zookeeper.ServerSet.EndpointStatus)
+ * onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}.
+ * A clean shutdown will also be initiated if control actions fail during normal state transitions.
  */
 public class SchedulerLifecycle implements EventSubscriber {
 
-  /**
-   * A {@link SingletonService} scheduler leader candidate that exposes a method for awaiting clean
-   * shutdown.
-   */
-  public interface SchedulerCandidate extends SingletonService.LeadershipListener {
-    /**
-     * Waits for this candidate to abdicate or otherwise decide to quit.
-     */
-    void awaitShutdown();
+  private static final Logger LOG = Logger.getLogger(SchedulerLifecycle.class.getName());
+
+  private enum State {
+    IDLE,
+    PREPARING_STORAGE,
+    STORAGE_PREPARED,
+    LEADER_AWAITING_REGISTRATION,
+    REGISTERED_LEADER,
+    RUNNING,
+    DEAD
   }
 
-  @CmdLine(name = "max_registration_delay",
-      help = "Max allowable delay to allow the driver to register before aborting")
-  private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
-      Arg.create(Amount.of(1L, Time.MINUTES));
+  private static final Predicate<Transition<State>> IS_DEAD = new Predicate<Transition<State>>() {
+    @Override public boolean apply(Transition<State> state) {
+      return state.getTo() == State.DEAD;
+    }
+  };
 
-  @CmdLine(name = "max_leading_duration",
-      help = "After leading for this duration, the scheduler should commit suicide.")
-  private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
-      Arg.create(Amount.of(1L, Time.DAYS));
+  private static final Predicate<Transition<State>> NOT_DEAD = Predicates.not(IS_DEAD);
 
-  /**
-   * Binding annotation to attach to the flag indicating whether to initiate application shutdown
-   * when the driver returns from {@link Driver#run()}.
-   */
-  @BindingAnnotation
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface ShutdownOnDriverExit { }
+  private final LeadershipListener leadershipListener;
+  private final AtomicBoolean registrationAcked = new AtomicBoolean(false);
+  private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
+  private final StateMachine<State> stateMachine;
 
-  private static final Logger LOG = Logger.getLogger(SchedulerLifecycle.class.getName());
+  @Inject
+  SchedulerLifecycle(
+      final DriverFactory driverFactory,
+      final NonVolatileStorage storage,
+      final Lifecycle lifecycle,
+      final Driver driver,
+      final DriverReference driverRef,
+      final LeadingOptions leadingOptions,
+      final ScheduledExecutorService executorService,
+      final Clock clock) {
+
+    this(
+        driverFactory,
+        storage,
+        lifecycle,
+        driver,
+        driverRef,
+        new DelayedActions() {
+          @Override public void blockingDriverJoin(Runnable runnable) {
+            executorService.execute(runnable);
+          }
 
-  private final DriverFactory driverFactory;
-  private final NonVolatileStorage storage;
-  private final Lifecycle lifecycle;
-  private final CountDownLatch registeredLatch = new CountDownLatch(1);
-  private final AtomicInteger registeredFlag = Stats.exportInt("framework_registered");
+          @Override public void onAutoFailover(Runnable runnable) {
+            executorService.schedule(
+                runnable,
+                leadingOptions.leadingTimeLimit.getValue(),
+                leadingOptions.leadingTimeLimit.getUnit().getTimeUnit());
+          }
 
-  private final Driver driver;
-  private final DriverReference driverRef;
-  private final boolean shutdownAfterRunning;
-  private final Clock clock;
+          @Override public void onRegistrationTimeout(Runnable runnable) {
+            LOG.info(
+                "Giving up on registration in " + leadingOptions.registrationDelayLimit);
+            executorService.schedule(
+                runnable,
+                leadingOptions.registrationDelayLimit.getValue(),
+                leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
+          }
+        },
+        clock);
+  }
 
-  @Inject
+  @VisibleForTesting
   SchedulerLifecycle(
-      DriverFactory driverFactory,
-      NonVolatileStorage storage,
-      Lifecycle lifecycle,
-      Driver driver,
-      DriverReference driverRef,
-      @ShutdownOnDriverExit boolean shutdownAfterRunning,
-      Clock clock) {
-
-    this.driverFactory = checkNotNull(driverFactory);
-    this.storage = checkNotNull(storage);
-    this.lifecycle = checkNotNull(lifecycle);
-    this.driver  = checkNotNull(driver);
-    this.driverRef = checkNotNull(driverRef);
-    this.shutdownAfterRunning = shutdownAfterRunning;
-    this.clock = checkNotNull(clock);
+      final DriverFactory driverFactory,
+      final NonVolatileStorage storage,
+      final Lifecycle lifecycle,
+      // TODO(wfarner): The presence of Driver and DriverReference is quite confusing.  Figure out
+      //                a clean way to collapse the duties of DriverReference into DriverImpl.
+      final Driver driver,
+      final DriverReference driverRef,
+      final DelayedActions delayedActions,
+      final Clock clock) {
+
+    Stats.export(new StatImpl<Integer>("framework_registered") {
+      @Override public Integer read() {
+        return registrationAcked.get() ? 1 : 0;
+      }
+    });
+    for (final State state : State.values()) {
+      Stats.export(new StatImpl<Integer>("scheduler_lifecycle_" + state) {
+        @Override public Integer read() {
+          return (state == stateMachine.getState()) ? 1 : 0;
+        }
+      });
+    }
+
+    final Closure<Transition<State>> prepareStorage = new Closure<Transition<State>>() {
+      @Override public void execute(Transition<State> transition) {
+        try {
+          storage.prepare();
+          stateMachine.transition(State.STORAGE_PREPARED);
+        } catch (RuntimeException e) {
+          stateMachine.transition(State.DEAD);
+          throw e;
+        }
+      }
+    };
+
+    final Closure<Transition<State>> handleLeading = new Closure<Transition<State>>() {
+      @Override public void execute(Transition<State> transition) {
+        LOG.info("Elected as leading scheduler!");
+        storage.start(new MutateWork.NoResult.Quiet() {
+          @Override protected void execute(MutableStoreProvider storeProvider) {
+            StorageBackfill.backfill(storeProvider, clock);
+          }
+        });
+
+        @Nullable final String frameworkId = storage.consistentRead(
+            new Work.Quiet<String>() {
+              @Override public String apply(StoreProvider storeProvider) {
+                return storeProvider.getSchedulerStore().fetchFrameworkId();
+              }
+            });
+        driverRef.set(driverFactory.apply(frameworkId));
+
+        delayedActions.onRegistrationTimeout(
+            new Runnable() {
+              @Override public void run() {
+                if (!registrationAcked.get()) {
+                  LOG.severe(
+                      "Framework has not been registered within the tolerated delay.");
+                  stateMachine.transition(State.DEAD);
+                }
+              }
+            });
+
+        delayedActions.onAutoFailover(
+            new Runnable() {
+              @Override public void run() {
+                LOG.info("Triggering automatic failover.");
+                stateMachine.transition(State.DEAD);
+              }
+            });
+
+        Protos.Status status = driver.start();
+        LOG.info("Driver started with code " + status);
+        delayedActions.blockingDriverJoin(new Runnable() {
+          @Override public void run() {
+            // Blocks until driver exits.
+            driver.join();
+            stateMachine.transition(State.DEAD);
+          }
+        });
+      }
+    };
+
+    final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
+      @Override public void execute(Transition<State> transition) {
+        registrationAcked.set(true);
+        try {
+          leaderControl.get().advertise();
+        } catch (JoinException e) {
+          LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
+          stateMachine.transition(State.DEAD);
+        } catch (InterruptedException e) {
+          LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.", e);
+          stateMachine.transition(State.DEAD);
+          Thread.currentThread().interrupt();
+        }
+      }
+    };
+
+    final Closure<Transition<State>> shutDown = new Closure<Transition<State>>() {
+      private final AtomicBoolean invoked = new AtomicBoolean(false);
+      @Override public void execute(Transition<State> transition) {
+        if (!invoked.compareAndSet(false, true)) {
+          LOG.info("Shutdown already invoked, ignoring extra call.");
+          return;
+        }
+
+        // TODO(wfarner): Consider using something like guava's Closer to abstractly tear down
+        // resources here.
+        try {
+          LeaderControl control = leaderControl.get();
+          if (control != null) {
+            try {
+              control.leave();
+            } catch (JoinException e) {
+              LOG.log(Level.WARNING, "Failed to leave leadership: " + e, e);
+            } catch (ServerSet.UpdateException e) {
+              LOG.log(Level.WARNING, "Failed to leave server set: " + e, e);
+            }
+          }
+
+          // TODO(wfarner): Re-evaluate tear-down ordering here.  Should the top-level shutdown
+          // be invoked first, or the underlying critical components?
+          driver.stop();
+          storage.stop();
+        } finally {
+          lifecycle.shutdown();
+        }
+      }
+    };
+
+    stateMachine = StateMachine.<State>builder("SchedulerLifecycle")
+        .initialState(State.IDLE)
+        .logTransitions()
+        .addState(
+            Closures.filter(NOT_DEAD, prepareStorage),
+            State.IDLE,
+            State.PREPARING_STORAGE, State.DEAD)
+        .addState(
+            State.PREPARING_STORAGE,
+            State.STORAGE_PREPARED, State.DEAD)
+        .addState(
+            Closures.filter(NOT_DEAD, handleLeading),
+            State.STORAGE_PREPARED,
+            State.LEADER_AWAITING_REGISTRATION, State.DEAD)
+        .addState(
+            Closures.filter(NOT_DEAD, handleRegistered),
+            State.LEADER_AWAITING_REGISTRATION,
+            State.REGISTERED_LEADER, State.DEAD)
+        .addState(
+            State.REGISTERED_LEADER,
+            State.RUNNING, State.DEAD)
+        .addState(
+            State.RUNNING,
+            State.DEAD)
+        .addState(
+            State.DEAD,
+            // Allow cycles in DEAD to prevent throwing and avoid the need for call-site checking.
+            State.DEAD
+        )
+        .onAnyTransition(
+            Closures.filter(IS_DEAD, shutDown))
+        .build();
+
+    this.leadershipListener = new SchedulerCandidateImpl(stateMachine, leaderControl);
   }
 
   /**
@@ -137,144 +329,89 @@ public class SchedulerLifecycle implements EventSubscriber {
    * host a live log replica and start syncing data from the leader via the log until it gets called
    * upon to lead.
    *
-   * @return A candidate that can be offered for leadership of a distributed election.
+   * @return A listener that can be offered for leadership of a distributed election.
    */
-  public SchedulerCandidate prepare() {
-    LOG.info("Preparing storage");
-    storage.prepare();
-    return new SchedulerCandidateImpl();
+  public LeadershipListener prepare() {
+    stateMachine.transition(State.PREPARING_STORAGE);
+    return leadershipListener;
   }
 
   @Subscribe
   public void registered(DriverRegistered event) {
-    registeredLatch.countDown();
-    registeredFlag.set(1);
+    stateMachine.transition(State.REGISTERED_LEADER);
   }
 
   /**
    * Maintains a reference to the driver.
    */
   static class DriverReference implements Supplier<Optional<SchedulerDriver>> {
-    private volatile Optional<SchedulerDriver> driver = Optional.absent();
+    private final AtomicReference<SchedulerDriver> driver = Atomics.newReference();
 
     @Override public Optional<SchedulerDriver> get() {
-      return driver;
+      return Optional.fromNullable(driver.get());
     }
 
     private void set(SchedulerDriver ref) {
-      this.driver = Optional.of(ref);
+      driver.set(ref);
     }
   }
 
-  /**
-   * Implementation of the scheduler candidate lifecycle.
-   */
-  private class SchedulerCandidateImpl implements SchedulerCandidate {
-    @Override public void onLeading(LeaderControl control) {
-      LOG.info("Elected as leading scheduler!");
-      try {
-        lead();
-        control.advertise();
-      } catch (Group.JoinException e) {
-        LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
-        lifecycle.shutdown();
-      } catch (InterruptedException e) {
-        LOG.log(Level.SEVERE, "Failed to update endpoint status, shutting down.", e);
-        lifecycle.shutdown();
-        Thread.currentThread().interrupt();
-      } catch (RuntimeException e) {
-        LOG.log(Level.SEVERE, "Unexpected exception attempting to lead, shutting down.", e);
-        lifecycle.shutdown();
-      }
-    }
+  private static class SchedulerCandidateImpl implements LeadershipListener {
+    private final StateMachine<State> stateMachine;
+    private final AtomicReference<LeaderControl> leaderControl;
 
-    private void startDaemonThread(String name, Runnable work) {
-      new ThreadFactoryBuilder()
-          .setNameFormat(name + "-%d")
-          .setDaemon(true)
-          .build()
-          .newThread(work)
-          .start();
-    }
+    SchedulerCandidateImpl(
+        StateMachine<State> stateMachine,
+        AtomicReference<LeaderControl> leaderControl) {
 
-    private void lead() {
-      storage.start(new MutateWork.NoResult.Quiet() {
-        @Override protected void execute(MutableStoreProvider storeProvider) {
-          StorageBackfill.backfill(storeProvider, clock);
-        }
-      });
-      @Nullable final String frameworkId = storage.consistentRead(new Work.Quiet<String>() {
-        @Override public String apply(StoreProvider storeProvider) {
-          return storeProvider.getSchedulerStore().fetchFrameworkId();
-        }
-      });
-
-      driverRef.set(driverFactory.apply(frameworkId));
-
-      startDaemonThread("Driver-Runner", new Runnable() {
-        @Override public void run() {
-          Protos.Status status = driver.run();
-          LOG.info("Driver completed with exit code " + status);
-          if (shutdownAfterRunning) {
-            lifecycle.shutdown();
-          }
-        }
-      });
-
-      startDaemonThread("Driver-Register-Watchdog", new Runnable() {
-        @Override public void run() {
-          LOG.info(String.format("Waiting up to %s for scheduler registration.",
-              MAX_REGISTRATION_DELAY.get()));
-
-          try {
-            boolean registered = registeredLatch.await(
-                MAX_REGISTRATION_DELAY.get().getValue(),
-                MAX_REGISTRATION_DELAY.get().getUnit().getTimeUnit());
-            if (!registered) {
-              LOG.severe("Framework has not been registered within the tolerated delay, quitting.");
-              lifecycle.shutdown();
-            }
-          } catch (InterruptedException e) {
-            LOG.log(Level.WARNING, "Delayed registration check interrupted.", e);
-            Thread.currentThread().interrupt();
-          }
-        }
-      });
+      this.stateMachine = stateMachine;
+      this.leaderControl = leaderControl;
+    }
 
-      startDaemonThread("Leader-Assassin", new Runnable() {
-        @Override public void run() {
-          try {
-            Thread.sleep(MAX_LEADING_DURATION.get().as(Time.MILLISECONDS));
-            LOG.info(
-                "Leader has been active for " + MAX_LEADING_DURATION.get() + ", forcing failover.");
-            onDefeated(null);
-          } catch (InterruptedException e) {
-            LOG.warning("Leader assassin thread interrupted.");
-            Thread.currentThread().interrupt();
-          }
-        }
-      });
+    @Override public void onLeading(LeaderControl control) {
+      leaderControl.set(control);
+      stateMachine.transition(State.LEADER_AWAITING_REGISTRATION);
     }
 
     @Override public void onDefeated(@Nullable ServerSet.EndpointStatus status) {
-      LOG.info("Lost leadership, committing suicide.");
+      LOG.severe("Lost leadership, committing suicide.");
+      stateMachine.transition(State.DEAD);
+    }
+  }
 
-      try {
-        if (status != null) {
-          status.leave();
-        }
+  public static class LeadingOptions {
+    private final Amount<Long, Time> registrationDelayLimit;
+    private final Amount<Long, Time> leadingTimeLimit;
 
-        driver.stop(); // shut down incoming offers
-        storage.stop();
-      } catch (ServerSet.UpdateException e) {
-        LOG.log(Level.WARNING, "Failed to leave server set.", e);
-      } finally {
-        lifecycle.shutdown(); // shut down the server
-      }
+    /**
+     * Creates a new collection of options for tuning leadership behavior.
+     *
+     * @param registrationDelayLimit Maximum amount of time to wait for framework registration to
+     *                               complete.
+     * @param leadingTimeLimit Maximum amount of time to serve as leader before abdicating.
+     */
+    public LeadingOptions(
+        Amount<Long, Time> registrationDelayLimit,
+        Amount<Long, Time> leadingTimeLimit) {
+
+      Preconditions.checkArgument(
+          registrationDelayLimit.getValue() >= 0,
+          "Registration delay limit must be positive.");
+      Preconditions.checkArgument(
+          leadingTimeLimit.getValue() >= 0,
+          "Leading time limit must be positive.");
+
+      this.registrationDelayLimit = checkNotNull(registrationDelayLimit);
+      this.leadingTimeLimit = checkNotNull(leadingTimeLimit);
     }
+  }
 
-    @Override public void awaitShutdown() {
-      lifecycle.awaitShutdown();
-    }
+  @VisibleForTesting
+  interface DelayedActions {
+    void blockingDriverJoin(Runnable runnable);
+
+    void onAutoFailover(Runnable runnable);
+
+    void onRegistrationTimeout(Runnable runnable);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
index bd7929d..44aeadc 100644
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
@@ -16,13 +16,17 @@
 package com.twitter.aurora.scheduler;
 
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.inject.Singleton;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
 import com.google.inject.Provides;
 import com.google.inject.TypeLiteral;
 
@@ -32,6 +36,7 @@ import org.apache.mesos.SchedulerDriver;
 import com.twitter.aurora.scheduler.Driver.DriverImpl;
 import com.twitter.aurora.scheduler.PulseMonitor.PulseMonitorImpl;
 import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import com.twitter.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
 import com.twitter.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import com.twitter.aurora.scheduler.events.PubsubEventModule;
 import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher;
@@ -54,6 +59,16 @@ public class SchedulerModule extends AbstractModule {
   @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
   private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
 
+  @CmdLine(name = "max_registration_delay",
+      help = "Max allowable delay to allow the driver to register before aborting")
+  private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  @CmdLine(name = "max_leading_duration",
+      help = "After leading for this duration, the scheduler should commit suicide.")
+  private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
+      Arg.create(Amount.of(1L, Time.DAYS));
+
   @Override
   protected void configure() {
     bind(Driver.class).to(DriverImpl.class);
@@ -75,7 +90,19 @@ public class SchedulerModule extends AbstractModule {
     bind(GcExecutorLauncher.class).in(Singleton.class);
     bind(UserTaskLauncher.class).in(Singleton.class);
 
-    bind(SchedulerLifecycle.class).in(Singleton.class);
+    install(new PrivateModule() {
+      @Override protected void configure() {
+        bind(LeadingOptions.class).toInstance(
+            new LeadingOptions(MAX_REGISTRATION_DELAY.get(), MAX_LEADING_DURATION.get()));
+          final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
+              1,
+              new ThreadFactoryBuilder().setNameFormat("Lifecycle-%d").setDaemon(true).build());
+        bind(ScheduledExecutorService.class).toInstance(executor);
+        bind(SchedulerLifecycle.class).in(Singleton.class);
+        expose(SchedulerLifecycle.class);
+      }
+    });
+
     PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
     PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java b/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
index 22f7217..693c364 100644
--- a/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
@@ -40,7 +40,6 @@ import com.twitter.aurora.scheduler.DriverFactory;
 import com.twitter.aurora.scheduler.DriverFactory.DriverFactoryImpl;
 import com.twitter.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
 import com.twitter.aurora.scheduler.SchedulerLifecycle;
-import com.twitter.aurora.scheduler.SchedulerLifecycle.ShutdownOnDriverExit;
 import com.twitter.aurora.scheduler.cron.CronPredictor;
 import com.twitter.aurora.scheduler.cron.CronScheduler;
 import com.twitter.aurora.scheduler.cron.noop.NoopCronModule;
@@ -56,6 +55,7 @@ import com.twitter.aurora.scheduler.thrift.ThriftModule;
 import com.twitter.aurora.scheduler.thrift.auth.ThriftAuthModule;
 import com.twitter.common.application.AbstractApplication;
 import com.twitter.common.application.AppLauncher;
+import com.twitter.common.application.Lifecycle;
 import com.twitter.common.application.modules.HttpModule;
 import com.twitter.common.application.modules.LocalServiceRegistry;
 import com.twitter.common.application.modules.LogModule;
@@ -68,6 +68,7 @@ import com.twitter.common.inject.Bindings;
 import com.twitter.common.logging.RootLogConfig;
 import com.twitter.common.zookeeper.Group;
 import com.twitter.common.zookeeper.SingletonService;
+import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
 import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule;
 import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
 import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
@@ -133,6 +134,7 @@ public class SchedulerMain extends AbstractApplication {
   @Inject private SingletonService schedulerService;
   @Inject private LocalServiceRegistry serviceRegistry;
   @Inject private SchedulerLifecycle schedulerLifecycle;
+  @Inject private Lifecycle appLifecycle;
   @Inject private Optional<RootLogConfig.Configuration> glogConfig;
 
   private static Iterable<? extends Module> getSystemModules() {
@@ -183,7 +185,6 @@ public class SchedulerMain extends AbstractApplication {
         @Override protected void configure() {
           bind(DriverFactory.class).to(DriverFactoryImpl.class);
           bind(DriverFactoryImpl.class).in(Singleton.class);
-          bind(Boolean.class).annotatedWith(ShutdownOnDriverExit.class).toInstance(true);
           install(new MesosLogStreamModule(zkClientConfig));
         }
       };
@@ -205,7 +206,6 @@ public class SchedulerMain extends AbstractApplication {
           }
         });
         bind(ExecutorConfig.class).toInstance(new ExecutorConfig(THERMOS_EXECUTOR_PATH.get()));
-        bind(Boolean.class).annotatedWith(ShutdownOnDriverExit.class).toInstance(true);
       }
     };
 
@@ -227,7 +227,7 @@ public class SchedulerMain extends AbstractApplication {
       LOG.warning("Running without expected glog configuration.");
     }
 
-    SchedulerLifecycle.SchedulerCandidate candidate = schedulerLifecycle.prepare();
+    LeadershipListener leaderListener = schedulerLifecycle.prepare();
 
     Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
     if (!primarySocket.isPresent()) {
@@ -235,7 +235,10 @@ public class SchedulerMain extends AbstractApplication {
     }
 
     try {
-      schedulerService.lead(primarySocket.get(), serviceRegistry.getAuxiliarySockets(), candidate);
+      schedulerService.lead(
+          primarySocket.get(),
+          serviceRegistry.getAuxiliarySockets(),
+          leaderListener);
     } catch (Group.WatchException e) {
       throw new IllegalStateException("Failed to watch group and lead service.", e);
     } catch (Group.JoinException e) {
@@ -244,7 +247,7 @@ public class SchedulerMain extends AbstractApplication {
       throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
     }
 
-    candidate.awaitShutdown();
+    appLifecycle.awaitShutdown();
   }
 
   public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
index ceef9d3..d735828 100644
--- a/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/main/java/com/twitter/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -27,9 +27,9 @@ import com.twitter.aurora.scheduler.storage.JobStore;
 import com.twitter.aurora.scheduler.storage.LockStore;
 import com.twitter.aurora.scheduler.storage.QuotaStore;
 import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.Storage;
 import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
 import com.twitter.aurora.scheduler.storage.Storage.Work;
 import com.twitter.aurora.scheduler.storage.TaskStore;
@@ -54,7 +54,7 @@ public class StorageTestUtil {
   public final JobStore.Mutable jobStore;
   public final LockStore.Mutable lockStore;
   public final SchedulerStore.Mutable schedulerStore;
-  public final Storage storage;
+  public final NonVolatileStorage storage;
 
   /**
    * Creates a new storage test utility.
@@ -70,7 +70,7 @@ public class StorageTestUtil {
     this.jobStore = easyMock.createMock(JobStore.Mutable.class);
     this.lockStore = easyMock.createMock(LockStore.Mutable.class);
     this.schedulerStore = easyMock.createMock(SchedulerStore.Mutable.class);
-    this.storage = easyMock.createMock(Storage.class);
+    this.storage = easyMock.createMock(NonVolatileStorage.class);
   }
 
   private <T> IExpectationSetters<T> expectConsistentRead() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/test/java/com/twitter/aurora/scheduler/DriverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/DriverTest.java b/src/test/java/com/twitter/aurora/scheduler/DriverTest.java
index 5609b0b..f6f7c23 100644
--- a/src/test/java/com/twitter/aurora/scheduler/DriverTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/DriverTest.java
@@ -62,11 +62,11 @@ public class DriverTest extends EasyMockTest {
   @Test
   public void testMultipleStops() {
     expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(2);
-    expect(schedulerDriver.run()).andReturn(DRIVER_RUNNING);
+    expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
     control.replay();
 
-    assertEquals(DRIVER_RUNNING, driver.run());
+    assertEquals(DRIVER_RUNNING, driver.start());
     driver.stop();
     driver.stop();
   }
@@ -74,24 +74,24 @@ public class DriverTest extends EasyMockTest {
   @Test
   public void testStop() {
     expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(2);
-    expect(schedulerDriver.run()).andReturn(DRIVER_RUNNING);
+    expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
     control.replay();
 
-    assertEquals(DRIVER_RUNNING, driver.run());
+    assertEquals(DRIVER_RUNNING, driver.start());
     driver.stop();
   }
 
   @Test
   public void testNormalLifecycle() {
     expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver)).times(4);
-    expect(schedulerDriver.run()).andReturn(DRIVER_RUNNING);
+    expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.killTask(createTaskId(TASK_1))).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.killTask(createTaskId(TASK_2))).andReturn(DRIVER_RUNNING);
     expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
     control.replay();
 
-    assertEquals(DRIVER_RUNNING, driver.run());
+    assertEquals(DRIVER_RUNNING, driver.start());
     driver.killTask(TASK_1);
     driver.killTask(TASK_2);
     driver.stop();
@@ -107,10 +107,10 @@ public class DriverTest extends EasyMockTest {
   @Test(expected = IllegalStateException.class)
   public void testOnlyOneRunAllowed() {
     expect(driverSupplier.get()).andReturn(Optional.of(schedulerDriver));
-    expect(schedulerDriver.run()).andReturn(DRIVER_RUNNING);
+    expect(schedulerDriver.start()).andReturn(DRIVER_RUNNING);
     control.replay();
 
-    assertEquals(DRIVER_RUNNING, driver.run());
-    driver.run();
+    assertEquals(DRIVER_RUNNING, driver.start());
+    driver.start();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/test/java/com/twitter/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/com/twitter/aurora/scheduler/SchedulerLifecycleTest.java
new file mode 100644
index 0000000..3336875
--- /dev/null
+++ b/src/test/java/com/twitter/aurora/scheduler/SchedulerLifecycleTest.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.SchedulerDriver;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.aurora.scheduler.SchedulerLifecycle.DelayedActions;
+import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import com.twitter.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import com.twitter.aurora.scheduler.storage.testing.StorageTestUtil;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.base.Command;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+import com.twitter.common.zookeeper.SingletonService.LeaderControl;
+import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+public class SchedulerLifecycleTest extends EasyMockTest {
+
+  private static final String FRAMEWORK_ID = "framework id";
+
+  private DriverFactory driverFactory;
+  private StorageTestUtil storageUtil;
+  private Command shutdownRegistry;
+  private Driver driver;
+  private DriverReference driverRef;
+  private LeaderControl leaderControl;
+  private SchedulerDriver schedulerDriver;
+  private DelayedActions delayedActions;
+
+  private SchedulerLifecycle schedulerLifecycle;
+
+  @Before
+  public void setUp() {
+    driverFactory = createMock(DriverFactory.class);
+    storageUtil = new StorageTestUtil(this);
+    shutdownRegistry = createMock(Command.class);
+    driver = createMock(Driver.class);
+    driverRef = new DriverReference();
+    leaderControl = createMock(LeaderControl.class);
+    schedulerDriver = createMock(SchedulerDriver.class);
+    delayedActions = createMock(DelayedActions.class);
+    schedulerLifecycle = new SchedulerLifecycle(
+        driverFactory,
+        storageUtil.storage,
+        new Lifecycle(shutdownRegistry, new UncaughtExceptionHandler() {
+          @Override public void uncaughtException(Thread t, Throwable e) {
+            fail(e.getMessage());
+          }
+        }),
+        driver,
+        driverRef,
+        delayedActions,
+        createMock(Clock.class));
+  }
+
+  @Test
+  public void testAutoFailover() throws Throwable {
+    // Test that when timed failover is initiated, cleanup is done in a way that should allow the
+    // application to tear down cleanly.  Specifically, neglecting to call leaderControl.leave()
+    // can result in a lame duck scheduler process.
+
+    storageUtil.storage.prepare();
+
+    storageUtil.storage.start(EasyMock.<Quiet>anyObject());
+    storageUtil.expectOperations();
+    expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(FRAMEWORK_ID);
+    expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
+    Capture<Runnable> triggerFailoverCapture = createCapture();
+    delayedActions.onAutoFailover(capture(triggerFailoverCapture));
+    delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
+    expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
+    delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
+
+    leaderControl.advertise();
+    leaderControl.leave();
+    driver.stop();
+    storageUtil.storage.stop();
+    shutdownRegistry.execute();
+
+    control.replay();
+
+    LeadershipListener leaderListener = schedulerLifecycle.prepare();
+    leaderListener.onLeading(leaderControl);
+    schedulerLifecycle.registered(new DriverRegistered());
+    triggerFailoverCapture.getValue().run();
+  }
+
+  @Test
+  public void testDefeatedBeforeRegistered() throws Throwable {
+    storageUtil.storage.prepare();
+    storageUtil.storage.start(EasyMock.<Quiet>anyObject());
+    storageUtil.expectOperations();
+    expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(FRAMEWORK_ID);
+    expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
+    delayedActions.onAutoFailover(EasyMock.<Runnable>anyObject());
+    delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
+    expect(driver.start()).andReturn(Status.DRIVER_RUNNING);
+    delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
+
+    // Important piece here is what's absent - leader presence is not advertised.
+    leaderControl.leave();
+    driver.stop();
+    storageUtil.storage.stop();
+    shutdownRegistry.execute();
+
+    control.replay();
+
+    LeadershipListener leaderListener = schedulerLifecycle.prepare();
+    leaderListener.onLeading(leaderControl);
+    leaderListener.onDefeated(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ad999b9f/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java b/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java
index 4c381b9..1242d46 100644
--- a/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/com/twitter/aurora/scheduler/app/SchedulerIT.java
@@ -72,7 +72,6 @@ import com.twitter.aurora.gen.storage.Transaction;
 import com.twitter.aurora.gen.storage.storageConstants;
 import com.twitter.aurora.scheduler.DriverFactory;
 import com.twitter.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
-import com.twitter.aurora.scheduler.SchedulerLifecycle.ShutdownOnDriverExit;
 import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
 import com.twitter.aurora.scheduler.log.Log;
 import com.twitter.aurora.scheduler.log.Log.Entry;
@@ -187,7 +186,6 @@ public class SchedulerIT extends BaseZooKeeperTest {
             }
         );
         bind(ExecutorConfig.class).toInstance(new ExecutorConfig("/executor/thermos"));
-        bind(Boolean.class).annotatedWith(ShutdownOnDriverExit.class).toInstance(false);
         install(new BackupModule(backupDir, SnapshotStoreImpl.class));
       }
     };
@@ -330,8 +328,12 @@ public class SchedulerIT extends BaseZooKeeperTest {
     logStream.close();
     expectLastCall().anyTimes();
 
-    expect(driver.run()).andAnswer(new IAnswer<Status>() {
-      @Override public Status answer() throws InterruptedException {
+    final AtomicReference<Scheduler> scheduler = Atomics.newReference();
+    expect(driver.start()).andAnswer(new IAnswer<Status>() {
+      @Override public Status answer() {
+        scheduler.get().registered(driver,
+            FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
+            MasterInfo.getDefaultInstance());
         return Status.DRIVER_RUNNING;
       }
     });
@@ -339,13 +341,9 @@ public class SchedulerIT extends BaseZooKeeperTest {
     control.replay();
     startScheduler();
 
+    scheduler.set(getScheduler());
     awaitSchedulerReady();
 
-    Scheduler scheduler = getScheduler();
-    scheduler.registered(driver,
-        FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
-        MasterInfo.getDefaultInstance());
-
     assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
     assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());
     assertEquals(1L, Stats.<Long>getVariable("task_store_RUNNING").read().longValue());