You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/08 00:11:31 UTC

[3/3] incubator-aurora git commit: Simplify management of the driver lifecycle using AbstractidleService.

Simplify management of the driver lifecycle using AbstractidleService.

Bugs closed: AURORA-920

Reviewed at https://reviews.apache.org/r/27746/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/66bd6fec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/66bd6fec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/66bd6fec

Branch: refs/heads/master
Commit: 66bd6fec9340eade40a61784ed464c3e94b9d784
Parents: 03cb0d1
Author: Bill Farner <wf...@apache.org>
Authored: Fri Nov 7 15:08:35 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Nov 7 15:08:35 2014 -0800

----------------------------------------------------------------------
 config/legacy_untested_classes.txt              |   2 +
 .../org/apache/aurora/scheduler/Driver.java     | 157 ---------
 .../apache/aurora/scheduler/DriverFactory.java  | 185 -----------
 .../aurora/scheduler/MesosSchedulerImpl.java    | 252 --------------
 .../aurora/scheduler/MesosTaskFactory.java      | 156 ---------
 .../aurora/scheduler/SchedulerLifecycle.java    |  42 +--
 .../aurora/scheduler/SchedulerModule.java       |  16 -
 .../apache/aurora/scheduler/app/AppModule.java  |   2 +
 .../aurora/scheduler/app/SchedulerMain.java     |  11 +-
 .../aurora/scheduler/async/AsyncModule.java     |   2 +
 .../scheduler/async/GcExecutorLauncher.java     |   2 +-
 .../scheduler/async/JobUpdateHistoryPruner.java |   1 -
 .../aurora/scheduler/async/KillRetry.java       |   2 +-
 .../aurora/scheduler/async/OfferQueue.java      |   2 +-
 .../aurora/scheduler/async/TaskTimeout.java     |  62 ++--
 .../mesos/CommandLineDriverSettingsModule.java  | 159 +++++++++
 .../apache/aurora/scheduler/mesos/Driver.java   |  57 ++++
 .../aurora/scheduler/mesos/DriverFactory.java   |  32 ++
 .../scheduler/mesos/DriverFactoryImpl.java      |  44 +++
 .../aurora/scheduler/mesos/DriverSettings.java  |  53 +++
 .../scheduler/mesos/LibMesosLoadingModule.java  |  26 ++
 .../scheduler/mesos/MesosSchedulerImpl.java     | 253 ++++++++++++++
 .../scheduler/mesos/MesosTaskFactory.java       | 158 +++++++++
 .../scheduler/mesos/SchedulerDriverModule.java  |  45 +++
 .../scheduler/mesos/SchedulerDriverService.java | 141 ++++++++
 .../scheduler/state/StateManagerImpl.java       |   2 +-
 .../aurora/scheduler/state/StateModule.java     |   4 +-
 .../aurora/scheduler/state/TaskAssigner.java    |   2 +-
 .../scheduler/storage/log/EntrySerializer.java  |   1 +
 .../storage/log/StreamManagerImpl.java          |   1 +
 .../aurora/scheduler/DriverFactoryImplTest.java |  65 ----
 .../org/apache/aurora/scheduler/DriverTest.java |  99 ------
 .../scheduler/MesosSchedulerImplTest.java       | 330 ------------------
 .../scheduler/MesosTaskFactoryImplTest.java     | 104 ------
 .../scheduler/SchedulerLifecycleTest.java       |  32 +-
 .../aurora/scheduler/app/SchedulerIT.java       |  31 +-
 .../aurora/scheduler/app/local/FakeMaster.java  |  45 ++-
 .../scheduler/app/local/LocalSchedulerMain.java |  32 +-
 .../aurora/scheduler/async/AsyncModuleTest.java |   2 +-
 .../scheduler/async/GcExecutorLauncherTest.java |   2 +-
 .../aurora/scheduler/async/KillRetryTest.java   |   2 +-
 .../scheduler/async/OfferQueueImplTest.java     |   2 +-
 .../aurora/scheduler/async/TaskGroupsTest.java  |   1 -
 .../scheduler/async/TaskSchedulerTest.java      |   2 +-
 .../aurora/scheduler/async/TaskTimeoutTest.java |  46 ++-
 .../CommandLineDriverSettingsModuleTest.java    |  63 ++++
 .../scheduler/mesos/MesosSchedulerImplTest.java | 331 +++++++++++++++++++
 .../mesos/MesosTaskFactoryImplTest.java         | 105 ++++++
 .../mesos/SchedulerDriverServiceTest.java       | 148 +++++++++
 .../scheduler/quota/QuotaManagerImplTest.java   |   1 -
 .../scheduler/sla/MetricCalculatorTest.java     |   1 -
 .../scheduler/state/StateManagerImplTest.java   |   2 +-
 .../scheduler/state/TaskAssignerImplTest.java   |   2 +-
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   2 +-
 54 files changed, 1805 insertions(+), 1517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/config/legacy_untested_classes.txt
----------------------------------------------------------------------
diff --git a/config/legacy_untested_classes.txt b/config/legacy_untested_classes.txt
index f1d1921..33c1d6e 100644
--- a/config/legacy_untested_classes.txt
+++ b/config/legacy_untested_classes.txt
@@ -53,6 +53,8 @@ org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule
 org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule$3
 org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule$4
 org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule$5
+org/apache/aurora/scheduler/mesos/DriverFactoryImpl
+org/apache/aurora/scheduler/mesos/LibMesosLoadingModule
 org/apache/aurora/scheduler/state/MaintenanceController$MaintenanceControllerImpl$4
 org/apache/aurora/scheduler/state/MaintenanceController$MaintenanceControllerImpl$8
 org/apache/aurora/scheduler/stats/AsyncStatsModule$OfferAdapter$1

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Driver.java b/src/main/java/org/apache/aurora/scheduler/Driver.java
deleted file mode 100644
index b07378f..0000000
--- a/src/main/java/org/apache/aurora/scheduler/Driver.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.StateMachine;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.SchedulerDriver;
-
-import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
-
-/**
- * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
- * <pre>
- *   (run -> kill*)? -> stop*
- * </pre>
- *
- * Also ensures the driver is only asked for when needed.
- */
-public interface Driver {
-
-  /**
-   * Launches a task.
-   *
-   * @param offerId ID of the resource offer to accept with the task.
-   * @param task Task to launch.
-   */
-  void launchTask(OfferID offerId, TaskInfo task);
-
-  /**
-   * Declines a resource offer.
-   *
-   * @param offerId ID of the offer to decline.
-   */
-  void declineOffer(OfferID offerId);
-
-  /**
-   * Sends a kill task request for the given {@code taskId} to the mesos master.
-   *
-   * @param taskId The id of the task to kill.
-   */
-  void killTask(String taskId);
-
-  /**
-   * Stops the underlying driver if it is running, otherwise does nothing.
-   */
-  void stop();
-
-  interface SettableDriver extends Driver {
-    void initialize(SchedulerDriver driver);
-  }
-
-  /**
-   * Mesos driver implementation.
-   */
-  class DriverImpl implements SettableDriver {
-    private static final Logger LOG = Logger.getLogger(Driver.class.getName());
-
-    /**
-     * Driver states.
-     */
-    enum State {
-      INIT,
-      RUNNING,
-      STOPPED
-    }
-
-    private final StateMachine<State> stateMachine;
-    private final AtomicReference<SchedulerDriver> driverRef = Atomics.newReference();
-    private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
-
-    /**
-     * Creates a driver manager that will only ask for the underlying mesos driver when actually
-     * needed.
-     */
-    @Inject
-    DriverImpl() {
-      this.stateMachine =
-          StateMachine.<State>builder("scheduler_driver")
-              .initialState(State.INIT)
-              .addState(State.INIT, State.RUNNING, State.STOPPED)
-              .addState(State.RUNNING, State.STOPPED)
-              .logTransitions()
-              .throwOnBadTransition(true)
-              .build();
-    }
-
-    private synchronized SchedulerDriver get(State expected) {
-      // TODO(William Farner): Formalize the failure case here by throwing a checked exception.
-      stateMachine.checkState(expected);
-      return Objects.requireNonNull(driverRef.get());
-    }
-
-    @Override
-    public void launchTask(OfferID offerId, TaskInfo task) {
-      get(State.RUNNING).launchTasks(ImmutableList.of(offerId), ImmutableList.of(task));
-    }
-
-    @Override
-    public void declineOffer(OfferID offerId) {
-      get(State.RUNNING).declineOffer(offerId);
-    }
-
-    @Override
-    public void initialize(SchedulerDriver driver) {
-      Objects.requireNonNull(driver);
-      stateMachine.checkState(State.INIT);
-      Preconditions.checkState(driverRef.compareAndSet(null, driver));
-      stateMachine.transition(State.RUNNING);
-    }
-
-    @Override
-    public synchronized void stop() {
-      if (stateMachine.getState() == State.RUNNING) {
-        SchedulerDriver driver = get(State.RUNNING);
-        driver.stop(true /* failover */);
-        stateMachine.transition(State.STOPPED);
-      }
-    }
-
-    @Override
-    public void killTask(String taskId) {
-      SchedulerDriver driver = get(State.RUNNING);
-      Protos.Status status = driver.killTask(Protos.TaskID.newBuilder().setValue(taskId).build());
-
-      if (status != DRIVER_RUNNING) {
-        LOG.severe(String.format("Attempt to kill task %s failed with code %s",
-            taskId, status));
-        killFailures.incrementAndGet();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/DriverFactory.java b/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
deleted file mode 100644
index 9cc04a8..0000000
--- a/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-import javax.inject.Provider;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.protobuf.ByteString;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotNull;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.Protos.Credential;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-/**
- * Factory to create scheduler driver instances.
- */
-public interface DriverFactory extends Function<String, SchedulerDriver> {
-
-  class DriverFactoryImpl implements DriverFactory {
-    private static final Logger LOG = Logger.getLogger(DriverFactory.class.getName());
-
-    @NotNull
-    @CmdLine(name = "mesos_master_address",
-        help = "Address for the mesos master, can be a socket address or zookeeper path.")
-    private static final Arg<String> MESOS_MASTER_ADDRESS = Arg.create();
-
-    @VisibleForTesting
-    static final String PRINCIPAL_KEY = "aurora_authentication_principal";
-    @VisibleForTesting
-    static final String SECRET_KEY = "aurora_authentication_secret";
-    @CmdLine(name = "framework_authentication_file",
-        help = "Properties file which contains framework credentials to authenticate with Mesos"
-            + "master. Must contain the properties '" + PRINCIPAL_KEY + "' and "
-            + "'" + SECRET_KEY + "'.")
-    private static final Arg<File> FRAMEWORK_AUTHENTICATION_FILE = Arg.create();
-
-    @CmdLine(name = "framework_failover_timeout",
-        help = "Time after which a framework is considered deleted.  SHOULD BE VERY HIGH.")
-    private static final Arg<Amount<Long, Time>> FRAMEWORK_FAILOVER_TIMEOUT =
-        Arg.create(Amount.of(21L, Time.DAYS));
-
-    /**
-     * Require Mesos slaves to have checkpointing enabled. Slaves with checkpointing enabled will
-     * attempt to write checkpoints when required by a task's framework. These checkpoints allow
-     * executors to be reattached rather than killed when a slave is restarted.
-     *
-     * This flag is dangerous! When enabled tasks will not launch on slaves without checkpointing
-     * enabled.
-     *
-     * Behavior is as follows:
-     * (Scheduler -require_slave_checkpoint=true,  Slave --checkpoint=true):
-     *   Tasks will launch.        Checkpoints will be written.
-     * (Scheduler -require_slave_checkpoint=true,   Slave --checkpoint=false):
-     *   Tasks WILL NOT launch.
-     * (Scheduler -require_slave_checkpoint=false,  Slave --checkpoint=true):
-     *   Tasks will launch.        Checkpoints will not be written.
-     * (Scheduler -require_slave_checkpoint=false,  Slave --checkpoint=false):
-     *   Tasks will launch.        Checkpoints will not be written.
-     *
-     * TODO(ksweeney): Remove warning table after https://issues.apache.org/jira/browse/MESOS-444
-     * is resolved.
-     */
-    @CmdLine(name = "require_slave_checkpoint",
-        help = "DANGEROUS! Require Mesos slaves to have checkpointing enabled. When enabled a "
-            + "slave restart should not kill executors, but the scheduler will not be able to "
-            + "launch tasks on slaves without --checkpoint=true in their command lines. See "
-            + "DriverFactory.java for more information.")
-    private static final Arg<Boolean> REQUIRE_SLAVE_CHECKPOINT = Arg.create(false);
-
-    @CmdLine(name = "executor_user",
-        help = "User to start the executor. Defaults to \"root\". "
-            + "Set this to an unprivileged user if the mesos master was started with "
-            + "\"--no-root_submissions\". If set to anything other than \"root\", the executor "
-            + "will ignore the \"role\" setting for jobs since it can't use setuid() anymore. "
-            + "This means that all your jobs will run under the specified user and the user has "
-            + "to exist on the mesos slaves.")
-    private static final Arg<String> EXECUTOR_USER = Arg.create("root");
-
-    private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
-
-    private final Provider<Scheduler> scheduler;
-
-    @Inject
-    DriverFactoryImpl(Provider<Scheduler> scheduler) {
-      this.scheduler = Objects.requireNonNull(scheduler);
-    }
-
-    @VisibleForTesting
-    static Properties parseCredentials(InputStream credentialsStream) {
-      Properties properties = new Properties();
-      try {
-        properties.load(credentialsStream);
-      } catch (IOException e) {
-        LOG.severe("Unable to load authentication file");
-        throw Throwables.propagate(e);
-      }
-      Preconditions.checkState(properties.containsKey(PRINCIPAL_KEY),
-          "The framework authentication file is missing the key: %s", PRINCIPAL_KEY);
-      Preconditions.checkState(properties.containsKey(SECRET_KEY),
-          "The framework authentication file is missing the key: %s", SECRET_KEY);
-      return properties;
-    }
-
-    @Override
-    public SchedulerDriver apply(@Nullable String frameworkId) {
-      LOG.info("Connecting to mesos master: " + MESOS_MASTER_ADDRESS.get());
-
-      FrameworkInfo.Builder frameworkInfo = FrameworkInfo.newBuilder()
-          .setUser(EXECUTOR_USER.get())
-          .setName(TWITTER_FRAMEWORK_NAME)
-          .setCheckpoint(REQUIRE_SLAVE_CHECKPOINT.get())
-          .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT.get().as(Time.SECONDS));
-
-      if (frameworkId == null) {
-        LOG.warning("Did not find a persisted framework ID, connecting as a new framework.");
-      } else {
-        LOG.info("Found persisted framework ID: " + frameworkId);
-        frameworkInfo.setId(FrameworkID.newBuilder().setValue(frameworkId));
-      }
-
-      if (FRAMEWORK_AUTHENTICATION_FILE.hasAppliedValue()) {
-        Properties properties;
-        try {
-          properties = parseCredentials(new FileInputStream(FRAMEWORK_AUTHENTICATION_FILE.get()));
-        } catch (FileNotFoundException e) {
-          LOG.severe("Authentication File not Found");
-          throw Throwables.propagate(e);
-        }
-
-        LOG.info(String.format("Connecting to master using authentication (principal: %s).",
-            properties.get(PRINCIPAL_KEY)));
-
-        Credential credential = Credential.newBuilder()
-            .setPrincipal(properties.getProperty(PRINCIPAL_KEY))
-            .setSecret(ByteString.copyFromUtf8(properties.getProperty(SECRET_KEY)))
-            .build();
-
-        return new MesosSchedulerDriver(
-            scheduler.get(),
-            frameworkInfo.build(),
-            MESOS_MASTER_ADDRESS.get(),
-            credential);
-      } else {
-        LOG.warning("Connecting to master without authentication!");
-        return new MesosSchedulerDriver(
-            scheduler.get(),
-            frameworkInfo.build(),
-            MESOS_MASTER_ADDRESS.get());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
deleted file mode 100644
index c424ecd..0000000
--- a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.base.Preconditions;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.Stats;
-
-import org.apache.aurora.GuiceUtils.AllowUnchecked;
-import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskStatus;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-/**
- * Location for communication with mesos.
- */
-class MesosSchedulerImpl implements Scheduler {
-  private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
-
-  private final AtomicLong totalResourceOffers = Stats.exportLong("scheduler_resource_offers");
-  private final AtomicLong totalFailedStatusUpdates = Stats.exportLong("scheduler_status_updates");
-  private final AtomicLong totalFrameworkDisconnects =
-      Stats.exportLong("scheduler_framework_disconnects");
-  private final AtomicLong totalFrameworkReregisters =
-      Stats.exportLong("scheduler_framework_reregisters");
-  private final AtomicLong totalLostExecutors = Stats.exportLong("scheduler_lost_executors");
-
-  private final List<TaskLauncher> taskLaunchers;
-
-  private final Storage storage;
-  private final Lifecycle lifecycle;
-  private final EventSink eventSink;
-  private final Executor executor;
-  private volatile boolean isRegistered = false;
-
-  /**
-   * Binding annotation for the executor the incoming Mesos message handler uses.
-   */
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  @interface SchedulerExecutor { }
-
-  /**
-   * Creates a new scheduler.
-   *
-   * @param storage Store to save host attributes into.
-   * @param lifecycle Application lifecycle manager.
-   * @param taskLaunchers Task launchers, which will be used in order.  Calls to
-   *                      {@link TaskLauncher#willUse(Offer)} and
-   *                      {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
-   *                      launchers, ceasing after the first match (based on a return value of
-   *                      {@code true}.
-   * @param eventSink Pubsub sink to send driver status changes to.
-   * @param executor Executor for async work
-   */
-  @Inject
-  public MesosSchedulerImpl(
-      Storage storage,
-      final Lifecycle lifecycle,
-      List<TaskLauncher> taskLaunchers,
-      EventSink eventSink,
-      @SchedulerExecutor Executor executor) {
-
-    this.storage = requireNonNull(storage);
-    this.lifecycle = requireNonNull(lifecycle);
-    this.taskLaunchers = requireNonNull(taskLaunchers);
-    this.eventSink = requireNonNull(eventSink);
-    this.executor = requireNonNull(executor);
-  }
-
-  @Override
-  public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
-    LOG.info("Received notification of lost slave: " + slaveId);
-  }
-
-  @Override
-  public void registered(
-      SchedulerDriver driver,
-      final FrameworkID frameworkId,
-      MasterInfo masterInfo) {
-
-    LOG.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue());
-      }
-    });
-    isRegistered = true;
-    eventSink.post(new DriverRegistered());
-  }
-
-  @Override
-  public void disconnected(SchedulerDriver schedulerDriver) {
-    LOG.warning("Framework disconnected.");
-    totalFrameworkDisconnects.incrementAndGet();
-    eventSink.post(new DriverDisconnected());
-  }
-
-  @Override
-  public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
-    LOG.info("Framework re-registered with master " + masterInfo);
-    totalFrameworkReregisters.incrementAndGet();
-  }
-
-  @Timed("scheduler_resource_offers")
-  @Override
-  public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
-    Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
-
-    // Store all host attributes in a single write operation to prevent other threads from
-    // securing the storage lock between saves.  We also save the host attributes before passing
-    // offers elsewhere to ensure that host attributes are available before attempting to
-    // schedule tasks associated with offers.
-    // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
-    //                offers when the host attributes cannot be found. (AURORA-137)
-
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        storage.write(new MutateWork.NoResult.Quiet() {
-          @Override
-          protected void execute(MutableStoreProvider storeProvider) {
-            for (final Offer offer : offers) {
-              storeProvider.getAttributeStore()
-                  .saveHostAttributes(Conversions.getAttributes(offer));
-            }
-          }
-        });
-
-        for (Offer offer : offers) {
-          if (LOG.isLoggable(Level.FINE)) {
-            LOG.log(Level.FINE, String.format("Received offer: %s", offer));
-          }
-          totalResourceOffers.incrementAndGet();
-          for (TaskLauncher launcher : taskLaunchers) {
-            if (launcher.willUse(offer)) {
-              break;
-            }
-          }
-        }
-      }
-    });
-  }
-
-  @Override
-  public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
-    LOG.info("Offer rescinded: " + offerId);
-    for (TaskLauncher launcher : taskLaunchers) {
-      launcher.cancelOffer(offerId);
-    }
-  }
-
-  @AllowUnchecked
-  @Timed("scheduler_status_update")
-  @Override
-  public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
-    String info = status.hasData() ? status.getData().toStringUtf8() : null;
-    String infoMsg = info == null ? "" : " with info " + info;
-    String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() : "";
-    LOG.info("Received status update for task " + status.getTaskId().getValue()
-        + " in state " + status.getState() + infoMsg + coreMsg);
-
-    try {
-      for (TaskLauncher launcher : taskLaunchers) {
-        if (launcher.statusUpdate(status)) {
-          return;
-        }
-      }
-    } catch (SchedulerException e) {
-      LOG.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
-      // We re-throw the exception here in an effort to NACK the status update and trigger an
-      // abort of the driver.  Previously we directly issued driver.abort(), but the re-entrancy
-      // guarantees of that are uncertain (and we believe it was not working).  However, this
-      // was difficult to discern since logging is unreliable during JVM shutdown and we would not
-      // see the above log message.
-      throw e;
-    }
-
-    LOG.warning("Unhandled status update " + status);
-    totalFailedStatusUpdates.incrementAndGet();
-  }
-
-  @Override
-  public void error(SchedulerDriver driver, String message) {
-    LOG.severe("Received error message: " + message);
-    lifecycle.shutdown();
-  }
-
-  @Override
-  public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
-      int status) {
-
-    LOG.info("Lost executor " + executorID);
-    totalLostExecutors.incrementAndGet();
-  }
-
-  @Timed("scheduler_framework_message")
-  @Override
-  public void frameworkMessage(
-      SchedulerDriver driver,
-      ExecutorID executorID,
-      SlaveID slave,
-      byte[] data) {
-
-    LOG.warning("Ignoring framework message.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
deleted file mode 100644
index ad07bca..0000000
--- a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.protobuf.ByteString;
-import com.twitter.common.quantity.Data;
-
-import org.apache.aurora.Protobufs;
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.scheduler.base.CommandUtil;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import static java.util.Objects.requireNonNull;
-
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * A factory to create mesos task objects.
- */
-public interface MesosTaskFactory {
-
-  /**
-   * Creates a mesos task object.
-   *
-   * @param task Assigned task to translate into a task object.
-   * @param slaveId Id of the slave the task is being assigned to.
-   * @return A new task.
-   * @throws SchedulerException If the task could not be encoded.
-   */
-  TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
-
-  class ExecutorConfig {
-    private final String executorPath;
-
-    public ExecutorConfig(String executorPath) {
-      this.executorPath = checkNotBlank(executorPath);
-    }
-
-    String getExecutorPath() {
-      return executorPath;
-    }
-  }
-
-  class MesosTaskFactoryImpl implements MesosTaskFactory {
-    private static final Logger LOG = Logger.getLogger(MesosTaskFactoryImpl.class.getName());
-    private static final String EXECUTOR_PREFIX = "thermos-";
-
-    /**
-     * Name to associate with task executors.
-     */
-    @VisibleForTesting
-    static final String EXECUTOR_NAME = "aurora.task";
-
-    private final String executorPath;
-
-    @Inject
-    MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
-      this.executorPath = executorConfig.getExecutorPath();
-    }
-
-    @VisibleForTesting
-    static ExecutorID getExecutorId(String taskId) {
-      return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
-    }
-
-    public static String getJobSourceName(IJobKey jobkey) {
-      return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
-    }
-
-    public static String getJobSourceName(ITaskConfig task) {
-      return getJobSourceName(task.getJob());
-    }
-
-    public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
-      return String.format("%s.%s", getJobSourceName(task), instanceId);
-    }
-
-    @Override
-    public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
-      requireNonNull(task);
-      byte[] taskInBytes;
-      try {
-        taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
-      } catch (ThriftBinaryCodec.CodingException e) {
-        LOG.log(Level.SEVERE, "Unable to serialize task.", e);
-        throw new SchedulerException("Internal error.", e);
-      }
-
-      ITaskConfig config = task.getTask();
-      // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
-      List<Resource> resources = Resources.from(config)
-          .toResourceList(task.isSetAssignedPorts()
-              ? ImmutableSet.copyOf(task.getAssignedPorts().values())
-              : ImmutableSet.<Integer>of());
-
-      if (LOG.isLoggable(Level.FINE)) {
-        LOG.fine("Setting task resources to "
-            + Iterables.transform(resources, Protobufs.SHORT_TOSTRING));
-      }
-      TaskInfo.Builder taskBuilder =
-          TaskInfo.newBuilder()
-              .setName(JobKeys.canonicalString(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
-              .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
-              .setSlaveId(slaveId)
-              .addAllResources(resources)
-              .setData(ByteString.copyFrom(taskInBytes));
-
-      ExecutorInfo executor = ExecutorInfo.newBuilder()
-          .setCommand(CommandUtil.create(executorPath))
-          .setExecutorId(getExecutorId(task.getTaskId()))
-          .setName(EXECUTOR_NAME)
-          .setSource(getInstanceSourceName(config, task.getInstanceId()))
-          .addResources(
-              Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
-          .addResources(Resources.makeMesosResource(
-              Resources.RAM_MB,
-              ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
-          .build();
-      return taskBuilder
-          .setExecutor(executor)
-          .build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index 453f22a..e741913 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -30,7 +30,6 @@ import javax.inject.Inject;
 import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -55,18 +54,14 @@ import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.common.zookeeper.SingletonService.LeaderControl;
 
 import org.apache.aurora.GuavaUtils.ServiceManagerIface;
-import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.StorageBackfill;
-import org.apache.mesos.Protos;
-import org.apache.mesos.SchedulerDriver;
 
 import static java.util.Objects.requireNonNull;
 
@@ -125,18 +120,11 @@ public class SchedulerLifecycle implements EventSubscriber {
   private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
   private final StateMachine<State> stateMachine;
 
-  // The local driver reference, distinct from the global SettableDriver.
-  // This is used to perform actions with the driver (i.e. invoke start(), join()),
-  // which no other code should do.  It also permits us to save the reference until we are ready to
-  // make the driver ready by invoking SettableDriver.initialize().
-  private final AtomicReference<SchedulerDriver> driverRef = Atomics.newReference();
-
   @Inject
   SchedulerLifecycle(
-      DriverFactory driverFactory,
       NonVolatileStorage storage,
       Lifecycle lifecycle,
-      SettableDriver driver,
+      Driver driver,
       LeadingOptions leadingOptions,
       ScheduledExecutorService executorService,
       Clock clock,
@@ -146,7 +134,6 @@ public class SchedulerLifecycle implements EventSubscriber {
       @SchedulerActive ServiceManagerIface schedulerActiveServiceManager) {
 
     this(
-        driverFactory,
         storage,
         lifecycle,
         driver,
@@ -209,10 +196,9 @@ public class SchedulerLifecycle implements EventSubscriber {
 
   @VisibleForTesting
   SchedulerLifecycle(
-      final DriverFactory driverFactory,
       final NonVolatileStorage storage,
       final Lifecycle lifecycle,
-      final SettableDriver driver,
+      final Driver driver,
       final DelayedActions delayedActions,
       final Clock clock,
       final EventSink eventSink,
@@ -220,7 +206,6 @@ public class SchedulerLifecycle implements EventSubscriber {
       StatsProvider statsProvider,
       final ServiceManagerIface schedulerActiveServiceManager) {
 
-    requireNonNull(driverFactory);
     requireNonNull(storage);
     requireNonNull(lifecycle);
     requireNonNull(driver);
@@ -276,17 +261,7 @@ public class SchedulerLifecycle implements EventSubscriber {
           }
         });
 
-        final Optional<String> frameworkId = storage.consistentRead(
-            new Work.Quiet<Optional<String>>() {
-              @Override
-              public Optional<String> apply(StoreProvider storeProvider) {
-                return storeProvider.getSchedulerStore().fetchFrameworkId();
-              }
-            });
-
-        // Save the prepared driver locally, but don't expose it until the registered callback is
-        // received.
-        driverRef.set(driverFactory.apply(frameworkId.orNull()));
+        driver.startAsync().awaitRunning();
 
         delayedActions.onRegistrationTimeout(
             new Runnable() {
@@ -308,9 +283,6 @@ public class SchedulerLifecycle implements EventSubscriber {
                 stateMachine.transition(State.DEAD);
               }
             });
-
-        Protos.Status status = driverRef.get().start();
-        LOG.info("Driver started with code " + status);
       }
     };
 
@@ -318,12 +290,10 @@ public class SchedulerLifecycle implements EventSubscriber {
       @Override
       public void execute(Transition<State> transition) {
         registrationAcked.set(true);
-        driver.initialize(driverRef.get());
         delayedActions.blockingDriverJoin(new Runnable() {
           @Override
           public void run() {
-            // Blocks until driver exits.
-            driverRef.get().join();
+            driver.blockUntilStopped();
             LOG.info("Driver exited, terminating lifecycle.");
             stateMachine.transition(State.DEAD);
           }
@@ -365,7 +335,7 @@ public class SchedulerLifecycle implements EventSubscriber {
 
           // TODO(wfarner): Re-evaluate tear-down ordering here.  Should the top-level shutdown
           // be invoked first, or the underlying critical components?
-          driver.stop();
+          driver.stopAsync().awaitTerminated();
           storage.stop();
         } finally {
           lifecycle.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 8d30640..72d3d60 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler;
 
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Logger;
 
@@ -37,15 +36,12 @@ import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.GuavaUtils;
 import org.apache.aurora.GuavaUtils.ServiceManagerIface;
-import org.apache.aurora.scheduler.Driver.DriverImpl;
-import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
 import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.mesos.Scheduler;
 
 /**
  * Binding module for top-level scheduling logic.
@@ -66,21 +62,9 @@ public class SchedulerModule extends AbstractModule {
 
   @Override
   protected void configure() {
-    bind(Driver.class).to(DriverImpl.class);
-    bind(SettableDriver.class).to(DriverImpl.class);
-    bind(DriverImpl.class).in(Singleton.class);
-
-    bind(Scheduler.class).to(MesosSchedulerImpl.class);
-    bind(MesosSchedulerImpl.class).in(Singleton.class);
-
     bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
-
     bind(UserTaskLauncher.class).in(Singleton.class);
 
-    // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
-    bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
-        .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
-
     install(new PrivateModule() {
       @Override
       protected void configure() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 9ec3f41..fef76f5 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -49,6 +49,7 @@ import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.http.JettyServerModule;
+import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
 import org.apache.aurora.scheduler.metadata.MetadataModule;
 import org.apache.aurora.scheduler.quota.QuotaModule;
 import org.apache.aurora.scheduler.sla.SlaModule;
@@ -115,6 +116,7 @@ class AppModule extends AbstractModule {
     install(new MetadataModule());
     install(new QuotaModule());
     install(new JettyServerModule());
+    install(new SchedulerDriverModule());
     install(new SchedulerModule());
     install(new StateModule());
     install(new SlaModule());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 0487409..288e7cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -17,7 +17,6 @@ import java.net.InetSocketAddress;
 import java.util.List;
 
 import javax.inject.Inject;
-import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -46,12 +45,12 @@ import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
 import org.apache.aurora.auth.CapabilityValidator;
 import org.apache.aurora.auth.SessionValidator;
 import org.apache.aurora.auth.UnsecureAuthModule;
-import org.apache.aurora.scheduler.DriverFactory;
-import org.apache.aurora.scheduler.DriverFactory.DriverFactoryImpl;
-import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
 import org.apache.aurora.scheduler.SchedulerLifecycle;
 import org.apache.aurora.scheduler.cron.quartz.CronModule;
 import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
+import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
+import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
 import org.apache.aurora.scheduler.storage.db.DbModule;
 import org.apache.aurora.scheduler.storage.db.MigrationModule;
@@ -157,8 +156,8 @@ public class SchedulerMain extends AbstractApplication {
     return new AbstractModule() {
       @Override
       protected void configure() {
-        bind(DriverFactory.class).to(DriverFactoryImpl.class);
-        bind(DriverFactoryImpl.class).in(Singleton.class);
+        install(new CommandLineDriverSettingsModule());
+        install(new LibMesosLoadingModule());
         install(new MesosLogStreamModule(zkClientConfig));
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 9bc42d3..4e37f4c 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -46,6 +46,7 @@ import com.twitter.common.util.BackoffStrategy;
 import com.twitter.common.util.Random;
 import com.twitter.common.util.TruncatedBinaryBackoff;
 
+import org.apache.aurora.scheduler.SchedulerModule;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.RandomGcExecutorSettings;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
@@ -254,6 +255,7 @@ public class AsyncModule extends AbstractModule {
       }
     });
     PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
+    SchedulerModule.addSchedulerActiveServiceBinding(binder()).to(TaskTimeout.class);
 
     install(new PrivateModule() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index feb70a9..79d8d8d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -41,12 +41,12 @@ import org.apache.aurora.Protobufs;
 import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.comm.AdjustRetainedTasks;
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskLauncher;
 import org.apache.aurora.scheduler.base.CommandUtil;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
index 0b023a2..8e9a1dc 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
@@ -21,7 +21,6 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.base.Joiner;
-
 import com.twitter.common.base.Command;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
index e656c7e..d6b7fab 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
@@ -26,10 +26,10 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.BackoffStrategy;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.Storage;
 
 import static java.util.Objects.requireNonNull;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index 1a45d08..14c3e3a 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -37,10 +37,10 @@ import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 8914022..8217c51 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.async;
 import java.util.EnumSet;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 
@@ -25,6 +24,7 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.StatsProvider;
@@ -40,10 +40,13 @@ import static java.util.Objects.requireNonNull;
  * Observes task transitions and identifies tasks that are 'stuck' in a transient state.  Stuck
  * tasks will be transitioned to the LOST state.
  */
-class TaskTimeout implements EventSubscriber {
+class TaskTimeout extends AbstractIdleService implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName());
 
   @VisibleForTesting
+  static final Amount<Long, Time> NOT_STARTED_RETRY = Amount.of(5L, Time.SECONDS);
+
+  @VisibleForTesting
   static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
 
   @VisibleForTesting
@@ -59,7 +62,7 @@ class TaskTimeout implements EventSubscriber {
 
   private final ScheduledExecutorService executor;
   private final StateManager stateManager;
-  private final long timeoutMillis;
+  private final Amount<Long, Time> timeout;
   private final AtomicLong timedOutTasks;
 
   @Inject
@@ -71,7 +74,7 @@ class TaskTimeout implements EventSubscriber {
 
     this.executor = requireNonNull(executor);
     this.stateManager = requireNonNull(stateManager);
-    this.timeoutMillis = timeout.as(Time.MILLISECONDS);
+    this.timeout = requireNonNull(timeout);
     this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
   }
 
@@ -79,6 +82,17 @@ class TaskTimeout implements EventSubscriber {
     return TRANSIENT_STATES.contains(status);
   }
 
+  @Override
+  protected void startUp() {
+    // No work to do here for startup, however we leverage the state tracking in
+    // AbstractIdleService.
+  }
+
+  @Override
+  protected void shutDown() {
+    // Nothing to do for shutting down.
+  }
+
   @Subscribe
   public void recordStateChange(TaskStateChange change) {
     final String taskId = change.getTaskId();
@@ -88,24 +102,34 @@ class TaskTimeout implements EventSubscriber {
           new Runnable() {
             @Override
             public void run() {
-              // This query acts as a CAS by including the state that we expect the task to be in if
-              // the timeout is still valid.  Ideally, the future would have already been canceled,
-              // but in the event of a state transition race, including transientState prevents an
-              // unintended task timeout.
-              // Note: This requires LOST transitions trigger Driver.killTask.
-              if (stateManager.changeState(
-                  taskId,
-                  Optional.of(newState),
-                  ScheduleStatus.LOST,
-                  TIMEOUT_MESSAGE)) {
-
-                LOG.info("Timeout reached for task " + taskId + ":" + taskId);
-                timedOutTasks.incrementAndGet();
+              if (isRunning()) {
+                // This query acts as a CAS by including the state that we expect the task to be in
+                // if the timeout is still valid.  Ideally, the future would have already been
+                // canceled, but in the event of a state transition race, including transientState
+                // prevents an unintended task timeout.
+                // Note: This requires LOST transitions trigger Driver.killTask.
+                if (stateManager.changeState(
+                    taskId,
+                    Optional.of(newState),
+                    ScheduleStatus.LOST,
+                    TIMEOUT_MESSAGE)) {
+
+                  LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+                  timedOutTasks.incrementAndGet();
+                }
+              } else {
+                // Our service is not yet started.  We don't want to lose track of the task, so
+                // we will try again later.
+                LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
+                executor.schedule(
+                    this,
+                    NOT_STARTED_RETRY.getValue(),
+                    NOT_STARTED_RETRY.getUnit().getTimeUnit());
               }
             }
           },
-          timeoutMillis,
-          TimeUnit.MILLISECONDS);
+          timeout.getValue(),
+          timeout.getUnit().getTimeUnit());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
new file mode 100644
index 0000000..d099420
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.inject.AbstractModule;
+import com.google.protobuf.ByteString;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotNull;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.mesos.Protos;
+
+import static org.apache.mesos.Protos.FrameworkInfo;
+
+/**
+ * Creates and binds {@link DriverSettings} based on values found on the command line.
+ */
+public class CommandLineDriverSettingsModule extends AbstractModule {
+
+  private static final Logger LOG =
+      Logger.getLogger(CommandLineDriverSettingsModule.class.getName());
+
+  @NotNull
+  @CmdLine(name = "mesos_master_address",
+      help = "Address for the mesos master, can be a socket address or zookeeper path.")
+  private static final Arg<String> MESOS_MASTER_ADDRESS = Arg.create();
+
+  @VisibleForTesting
+  static final String PRINCIPAL_KEY = "aurora_authentication_principal";
+  @VisibleForTesting
+  static final String SECRET_KEY = "aurora_authentication_secret";
+
+  @CmdLine(name = "framework_authentication_file",
+      help = "Properties file which contains framework credentials to authenticate with Mesos"
+          + "master. Must contain the properties '" + PRINCIPAL_KEY + "' and "
+          + "'" + SECRET_KEY + "'.")
+  private static final Arg<File> FRAMEWORK_AUTHENTICATION_FILE = Arg.create();
+
+  @CmdLine(name = "framework_failover_timeout",
+      help = "Time after which a framework is considered deleted.  SHOULD BE VERY HIGH.")
+  private static final Arg<Amount<Long, Time>> FRAMEWORK_FAILOVER_TIMEOUT =
+      Arg.create(Amount.of(21L, Time.DAYS));
+
+  /**
+   * Require Mesos slaves to have checkpointing enabled. Slaves with checkpointing enabled will
+   * attempt to write checkpoints when required by a task's framework. These checkpoints allow
+   * executors to be reattached rather than killed when a slave is restarted.
+   *
+   * This flag is dangerous! When enabled tasks will not launch on slaves without checkpointing
+   * enabled.
+   *
+   * Behavior is as follows:
+   * (Scheduler -require_slave_checkpoint=true,  Slave --checkpoint=true):
+   *   Tasks will launch.        Checkpoints will be written.
+   * (Scheduler -require_slave_checkpoint=true,   Slave --checkpoint=false):
+   *   Tasks WILL NOT launch.
+   * (Scheduler -require_slave_checkpoint=false,  Slave --checkpoint=true):
+   *   Tasks will launch.        Checkpoints will not be written.
+   * (Scheduler -require_slave_checkpoint=false,  Slave --checkpoint=false):
+   *   Tasks will launch.        Checkpoints will not be written.
+   *
+   * TODO(ksweeney): Remove warning table after https://issues.apache.org/jira/browse/MESOS-444
+   * is resolved.
+   */
+  @CmdLine(name = "require_slave_checkpoint",
+      help = "DANGEROUS! Require Mesos slaves to have checkpointing enabled. When enabled a "
+          + "slave restart should not kill executors, but the scheduler will not be able to "
+          + "launch tasks on slaves without --checkpoint=true in their command lines. See "
+          + "DriverFactory.java for more information.")
+  private static final Arg<Boolean> REQUIRE_SLAVE_CHECKPOINT = Arg.create(false);
+
+  @CmdLine(name = "executor_user",
+      help = "User to start the executor. Defaults to \"root\". "
+          + "Set this to an unprivileged user if the mesos master was started with "
+          + "\"--no-root_submissions\". If set to anything other than \"root\", the executor "
+          + "will ignore the \"role\" setting for jobs since it can't use setuid() anymore. "
+          + "This means that all your jobs will run under the specified user and the user has "
+          + "to exist on the mesos slaves.")
+  private static final Arg<String> EXECUTOR_USER = Arg.create("root");
+
+  // TODO(wfarner): Figure out a way to change this without risk of fallout (MESOS-703).
+  private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
+
+  @Override
+  protected void configure() {
+    FrameworkInfo frameworkInfo = FrameworkInfo.newBuilder()
+        .setUser(EXECUTOR_USER.get())
+        .setName(TWITTER_FRAMEWORK_NAME)
+        .setCheckpoint(REQUIRE_SLAVE_CHECKPOINT.get())
+        .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT.get().as(Time.SECONDS))
+        .build();
+    DriverSettings settings =
+        new DriverSettings(MESOS_MASTER_ADDRESS.get(), getCredentials(), frameworkInfo);
+    bind(DriverSettings.class).toInstance(settings);
+  }
+
+  private static Optional<Protos.Credential> getCredentials() {
+    if (FRAMEWORK_AUTHENTICATION_FILE.hasAppliedValue()) {
+      Properties properties;
+      try {
+        properties = parseCredentials(new FileInputStream(FRAMEWORK_AUTHENTICATION_FILE.get()));
+      } catch (FileNotFoundException e) {
+        LOG.severe("Authentication File not Found");
+        throw Throwables.propagate(e);
+      }
+
+      LOG.info(String.format("Connecting to master using authentication (principal: %s).",
+          properties.get(PRINCIPAL_KEY)));
+
+      return Optional.of(Protos.Credential.newBuilder()
+          .setPrincipal(properties.getProperty(PRINCIPAL_KEY))
+          .setSecret(ByteString.copyFromUtf8(properties.getProperty(SECRET_KEY)))
+          .build());
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  @VisibleForTesting
+  static Properties parseCredentials(InputStream credentialsStream) {
+    Properties properties = new Properties();
+    try {
+      properties.load(credentialsStream);
+    } catch (IOException e) {
+      LOG.severe("Unable to load authentication file");
+      throw Throwables.propagate(e);
+    }
+    Preconditions.checkState(properties.containsKey(PRINCIPAL_KEY),
+        "The framework authentication file is missing the key: %s", PRINCIPAL_KEY);
+    Preconditions.checkState(properties.containsKey(SECRET_KEY),
+        "The framework authentication file is missing the key: %s", SECRET_KEY);
+    return properties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
new file mode 100644
index 0000000..c7e45a8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.util.concurrent.Service;
+
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+
+/**
+ * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
+ * <pre>
+ *   (run -> kill*)? -> stop*
+ * </pre>
+ *
+ * Also ensures the driver is only asked for when needed.
+ */
+public interface Driver extends Service {
+
+  /**
+   * Launches a task.
+   *
+   * @param offerId ID of the resource offer to accept with the task.
+   * @param task Task to launch.
+   */
+  void launchTask(OfferID offerId, TaskInfo task);
+
+  /**
+   * Declines a resource offer.
+   *
+   * @param offerId ID of the offer to decline.
+   */
+  void declineOffer(OfferID offerId);
+
+  /**
+   * Sends a kill task request for the given {@code taskId} to the mesos master.
+   *
+   * @param taskId The id of the task to kill.
+   */
+  void killTask(String taskId);
+
+  /**
+   * Blocks until the driver is no longer active.
+   */
+  void blockUntilStopped();
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactory.java
new file mode 100644
index 0000000..92d8924
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+/**
+ * A layer over the constructor for {@link org.apache.mesos.MesosSchedulerDriver}. This is needed
+ * since {@link org.apache.mesos.MesosSchedulerDriver} statically loads libmesos.
+ */
+public interface DriverFactory {
+  SchedulerDriver create(
+      Scheduler scheduler,
+      Optional<Protos.Credential> credentials,
+      Protos.FrameworkInfo frameworkInfo,
+      String master);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactoryImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactoryImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactoryImpl.java
new file mode 100644
index 0000000..db7aa74
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactoryImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static org.apache.mesos.Protos.FrameworkInfo;
+
+/**
+ * A minimal shim over the constructor to {@link MesosSchedulerDriver} to minimize code that
+ * requires the libmesos native library.
+ */
+class DriverFactoryImpl implements DriverFactory {
+
+  @Override
+  public SchedulerDriver create(
+      Scheduler scheduler,
+      Optional<Protos.Credential> credentials,
+      FrameworkInfo frameworkInfo,
+      String master) {
+
+    if (credentials.isPresent()) {
+      return new MesosSchedulerDriver(scheduler, frameworkInfo, master, credentials.get());
+    } else {
+      return new MesosSchedulerDriver(scheduler, frameworkInfo, master);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/DriverSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/DriverSettings.java b/src/main/java/org/apache/aurora/scheduler/mesos/DriverSettings.java
new file mode 100644
index 0000000..85d471f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/DriverSettings.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Settings required to create a scheduler driver.
+ */
+@VisibleForTesting
+public class DriverSettings {
+  private final String masterUri;
+  private final Optional<Protos.Credential> credentials;
+  private final Protos.FrameworkInfo frameworkInfo;
+
+  public DriverSettings(
+      String masterUri,
+      Optional<Protos.Credential> credentials,
+      Protos.FrameworkInfo frameworkInfo) {
+
+    this.masterUri = requireNonNull(masterUri);
+    this.credentials = requireNonNull(credentials);
+    this.frameworkInfo = requireNonNull(frameworkInfo);
+  }
+
+  public String getMasterUri() {
+    return masterUri;
+  }
+
+  public Optional<Protos.Credential> getCredentials() {
+    return credentials;
+  }
+
+  public Protos.FrameworkInfo getFrameworkInfo() {
+    return frameworkInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
new file mode 100644
index 0000000..e1a2359
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * A module that binds a driver factory which requires the libmesos native libary.
+ */
+public class LibMesosLoadingModule extends AbstractModule {
+  @Override
+  protected void configure() {
+    bind(DriverFactory.class).to(DriverFactoryImpl.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
new file mode 100644
index 0000000..2d382f7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.base.Preconditions;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.GuiceUtils.AllowUnchecked;
+import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.base.Conversions;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Location for communication with mesos.
+ */
+class MesosSchedulerImpl implements Scheduler {
+  private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
+
+  private final AtomicLong totalResourceOffers = Stats.exportLong("scheduler_resource_offers");
+  private final AtomicLong totalFailedStatusUpdates = Stats.exportLong("scheduler_status_updates");
+  private final AtomicLong totalFrameworkDisconnects =
+      Stats.exportLong("scheduler_framework_disconnects");
+  private final AtomicLong totalFrameworkReregisters =
+      Stats.exportLong("scheduler_framework_reregisters");
+  private final AtomicLong totalLostExecutors = Stats.exportLong("scheduler_lost_executors");
+
+  private final List<TaskLauncher> taskLaunchers;
+
+  private final Storage storage;
+  private final Lifecycle lifecycle;
+  private final EventSink eventSink;
+  private final Executor executor;
+  private volatile boolean isRegistered = false;
+
+  /**
+   * Binding annotation for the executor the incoming Mesos message handler uses.
+   */
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface SchedulerExecutor { }
+
+  /**
+   * Creates a new scheduler.
+   *
+   * @param storage Store to save host attributes into.
+   * @param lifecycle Application lifecycle manager.
+   * @param taskLaunchers Task launchers, which will be used in order.  Calls to
+   *                      {@link TaskLauncher#willUse(Offer)} and
+   *                      {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
+   *                      launchers, ceasing after the first match (based on a return value of
+   *                      {@code true}.
+   * @param eventSink Pubsub sink to send driver status changes to.
+   * @param executor Executor for async work
+   */
+  @Inject
+  public MesosSchedulerImpl(
+      Storage storage,
+      final Lifecycle lifecycle,
+      List<TaskLauncher> taskLaunchers,
+      EventSink eventSink,
+      @SchedulerExecutor Executor executor) {
+
+    this.storage = requireNonNull(storage);
+    this.lifecycle = requireNonNull(lifecycle);
+    this.taskLaunchers = requireNonNull(taskLaunchers);
+    this.eventSink = requireNonNull(eventSink);
+    this.executor = requireNonNull(executor);
+  }
+
+  @Override
+  public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
+    LOG.info("Received notification of lost slave: " + slaveId);
+  }
+
+  @Override
+  public void registered(
+      SchedulerDriver driver,
+      final FrameworkID frameworkId,
+      MasterInfo masterInfo) {
+
+    LOG.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue());
+      }
+    });
+    isRegistered = true;
+    eventSink.post(new DriverRegistered());
+  }
+
+  @Override
+  public void disconnected(SchedulerDriver schedulerDriver) {
+    LOG.warning("Framework disconnected.");
+    totalFrameworkDisconnects.incrementAndGet();
+    eventSink.post(new DriverDisconnected());
+  }
+
+  @Override
+  public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
+    LOG.info("Framework re-registered with master " + masterInfo);
+    totalFrameworkReregisters.incrementAndGet();
+  }
+
+  @Timed("scheduler_resource_offers")
+  @Override
+  public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
+    Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
+
+    // Store all host attributes in a single write operation to prevent other threads from
+    // securing the storage lock between saves.  We also save the host attributes before passing
+    // offers elsewhere to ensure that host attributes are available before attempting to
+    // schedule tasks associated with offers.
+    // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
+    //                offers when the host attributes cannot be found. (AURORA-137)
+
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        storage.write(new MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(MutableStoreProvider storeProvider) {
+            for (final Offer offer : offers) {
+              storeProvider.getAttributeStore()
+                  .saveHostAttributes(Conversions.getAttributes(offer));
+            }
+          }
+        });
+
+        for (Offer offer : offers) {
+          if (LOG.isLoggable(Level.FINE)) {
+            LOG.log(Level.FINE, String.format("Received offer: %s", offer));
+          }
+          totalResourceOffers.incrementAndGet();
+          for (TaskLauncher launcher : taskLaunchers) {
+            if (launcher.willUse(offer)) {
+              break;
+            }
+          }
+        }
+      }
+    });
+  }
+
+  @Override
+  public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
+    LOG.info("Offer rescinded: " + offerId);
+    for (TaskLauncher launcher : taskLaunchers) {
+      launcher.cancelOffer(offerId);
+    }
+  }
+
+  @AllowUnchecked
+  @Timed("scheduler_status_update")
+  @Override
+  public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
+    String info = status.hasData() ? status.getData().toStringUtf8() : null;
+    String infoMsg = info == null ? "" : " with info " + info;
+    String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() : "";
+    LOG.info("Received status update for task " + status.getTaskId().getValue()
+        + " in state " + status.getState() + infoMsg + coreMsg);
+
+    try {
+      for (TaskLauncher launcher : taskLaunchers) {
+        if (launcher.statusUpdate(status)) {
+          return;
+        }
+      }
+    } catch (SchedulerException e) {
+      LOG.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
+      // We re-throw the exception here in an effort to NACK the status update and trigger an
+      // abort of the driver.  Previously we directly issued driver.abort(), but the re-entrancy
+      // guarantees of that are uncertain (and we believe it was not working).  However, this
+      // was difficult to discern since logging is unreliable during JVM shutdown and we would not
+      // see the above log message.
+      throw e;
+    }
+
+    LOG.warning("Unhandled status update " + status);
+    totalFailedStatusUpdates.incrementAndGet();
+  }
+
+  @Override
+  public void error(SchedulerDriver driver, String message) {
+    LOG.severe("Received error message: " + message);
+    lifecycle.shutdown();
+  }
+
+  @Override
+  public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
+      int status) {
+
+    LOG.info("Lost executor " + executorID);
+    totalLostExecutors.incrementAndGet();
+  }
+
+  @Timed("scheduler_framework_message")
+  @Override
+  public void frameworkMessage(
+      SchedulerDriver driver,
+      ExecutorID executorID,
+      SlaveID slave,
+      byte[] data) {
+
+    LOG.warning("Ignoring framework message.");
+  }
+}