You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/08 00:11:30 UTC

[2/3] incubator-aurora git commit: Simplify management of the driver lifecycle using AbstractidleService.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
new file mode 100644
index 0000000..bb227fd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+import com.twitter.common.quantity.Data;
+
+import org.apache.aurora.Protobufs;
+import org.apache.aurora.codec.ThriftBinaryCodec;
+import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.base.CommandUtil;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * A factory to create mesos task objects.
+ */
+public interface MesosTaskFactory {
+
+  /**
+   * Creates a mesos task object.
+   *
+   * @param task Assigned task to translate into a task object.
+   * @param slaveId Id of the slave the task is being assigned to.
+   * @return A new task.
+   * @throws SchedulerException If the task could not be encoded.
+   */
+  TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
+
+  class ExecutorConfig {
+    private final String executorPath;
+
+    public ExecutorConfig(String executorPath) {
+      this.executorPath = checkNotBlank(executorPath);
+    }
+
+    String getExecutorPath() {
+      return executorPath;
+    }
+  }
+
+  // TODO(wfarner): Move this class to its own file to reduce visibility to package private.
+  class MesosTaskFactoryImpl implements MesosTaskFactory {
+    private static final Logger LOG = Logger.getLogger(MesosTaskFactoryImpl.class.getName());
+    private static final String EXECUTOR_PREFIX = "thermos-";
+
+    /**
+     * Name to associate with task executors.
+     */
+    @VisibleForTesting
+    static final String EXECUTOR_NAME = "aurora.task";
+
+    private final String executorPath;
+
+    @Inject
+    MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
+      this.executorPath = executorConfig.getExecutorPath();
+    }
+
+    @VisibleForTesting
+    static ExecutorID getExecutorId(String taskId) {
+      return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
+    }
+
+    public static String getJobSourceName(IJobKey jobkey) {
+      return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
+    }
+
+    public static String getJobSourceName(ITaskConfig task) {
+      return getJobSourceName(task.getJob());
+    }
+
+    public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
+      return String.format("%s.%s", getJobSourceName(task), instanceId);
+    }
+
+    @Override
+    public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
+      requireNonNull(task);
+      byte[] taskInBytes;
+      try {
+        taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
+      } catch (ThriftBinaryCodec.CodingException e) {
+        LOG.log(Level.SEVERE, "Unable to serialize task.", e);
+        throw new SchedulerException("Internal error.", e);
+      }
+
+      ITaskConfig config = task.getTask();
+      // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
+      List<Resource> resources = Resources.from(config)
+          .toResourceList(task.isSetAssignedPorts()
+              ? ImmutableSet.copyOf(task.getAssignedPorts().values())
+              : ImmutableSet.<Integer>of());
+
+      if (LOG.isLoggable(Level.FINE)) {
+        LOG.fine("Setting task resources to "
+            + Iterables.transform(resources, Protobufs.SHORT_TOSTRING));
+      }
+      TaskInfo.Builder taskBuilder =
+          TaskInfo.newBuilder()
+              .setName(JobKeys.canonicalString(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
+              .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
+              .setSlaveId(slaveId)
+              .addAllResources(resources)
+              .setData(ByteString.copyFrom(taskInBytes));
+
+      ExecutorInfo executor = ExecutorInfo.newBuilder()
+          .setCommand(CommandUtil.create(executorPath))
+          .setExecutorId(getExecutorId(task.getTaskId()))
+          .setName(EXECUTOR_NAME)
+          .setSource(getInstanceSourceName(config, task.getInstanceId()))
+          .addResources(
+              Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
+          .addResources(Resources.makeMesosResource(
+              Resources.RAM_MB,
+              ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
+          .build();
+      return taskBuilder
+          .setExecutor(executor)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
new file mode 100644
index 0000000..59ad9e6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.inject.PrivateModule;
+
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.mesos.Scheduler;
+
+/**
+ * A module that creates a {@link Driver} binding.
+ */
+public class SchedulerDriverModule extends PrivateModule {
+  private static final Logger LOG = Logger.getLogger(SchedulerDriverModule.class.getName());
+
+  @Override
+  protected void configure() {
+    bind(Driver.class).to(SchedulerDriverService.class);
+    bind(SchedulerDriverService.class).in(Singleton.class);
+    expose(Driver.class);
+
+    bind(Scheduler.class).to(MesosSchedulerImpl.class);
+    bind(MesosSchedulerImpl.class).in(Singleton.class);
+
+    // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
+    bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+        .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
new file mode 100644
index 0000000..88150e5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
+
+/**
+ * Manages the lifecycle of the scheduler driver, and provides a more constrained API to use it.
+ */
+class SchedulerDriverService extends AbstractIdleService implements Driver {
+  private static final Logger LOG = Logger.getLogger(SchedulerDriverService.class.getName());
+
+  private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
+  private final DriverFactory driverFactory;
+
+  private final Scheduler scheduler;
+  private final Storage storage;
+  private final DriverSettings driverSettings;
+  private final SettableFuture<SchedulerDriver> driverFuture = SettableFuture.create();
+
+  @Inject
+  SchedulerDriverService(
+      Scheduler scheduler,
+      Storage storage,
+      DriverSettings driverSettings,
+      DriverFactory driverFactory) {
+
+    this.scheduler = requireNonNull(scheduler);
+    this.storage = requireNonNull(storage);
+    this.driverSettings = requireNonNull(driverSettings);
+    this.driverFactory = requireNonNull(driverFactory);
+  }
+
+  @Override
+  protected void startUp() {
+    Optional<String> frameworkId = storage.consistentRead(
+        new Storage.Work.Quiet<Optional<String>>() {
+          @Override
+          public Optional<String> apply(Storage.StoreProvider storeProvider) {
+            return storeProvider.getSchedulerStore().fetchFrameworkId();
+          }
+        });
+
+    LOG.info("Connecting to mesos master: " + driverSettings.getMasterUri());
+    if (!driverSettings.getCredentials().isPresent()) {
+      LOG.warning("Connecting to master without authentication!");
+    }
+
+    FrameworkInfo.Builder frameworkBuilder = driverSettings.getFrameworkInfo().toBuilder();
+
+    if (frameworkId.isPresent()) {
+      LOG.info("Found persisted framework ID: " + frameworkId);
+      frameworkBuilder.setId(FrameworkID.newBuilder().setValue(frameworkId.get()));
+    } else {
+      LOG.warning("Did not find a persisted framework ID, connecting as a new framework.");
+    }
+
+    SchedulerDriver schedulerDriver = driverFactory.create(
+        scheduler,
+        driverSettings.getCredentials(),
+        frameworkBuilder.build(),
+        driverSettings.getMasterUri());
+    Protos.Status status = schedulerDriver.start();
+    LOG.info("Driver started with code " + status);
+
+    driverFuture.set(schedulerDriver);
+  }
+
+  @Override
+  public void blockUntilStopped() {
+    Futures.getUnchecked(driverFuture).join();
+  }
+
+  @Override
+  protected void shutDown() throws ExecutionException, InterruptedException {
+    // WARNING: stop() and stop(false) are dangerous, avoid at all costs. See the docs for
+    // SchedulerDriver for more details.
+    driverFuture.get().stop(true /* failover */);
+  }
+
+  @Override
+  public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) {
+    checkState(isRunning(), "Driver is not running.");
+    Futures.getUnchecked(driverFuture)
+        .launchTasks(ImmutableList.of(offerId), ImmutableList.of(task));
+  }
+
+  @Override
+  public void declineOffer(Protos.OfferID offerId) {
+    checkState(isRunning(), "Driver is not running.");
+    Futures.getUnchecked(driverFuture).declineOffer(offerId);
+  }
+
+  @Override
+  public void killTask(String taskId) {
+    checkState(isRunning(), "Driver is not running.");
+    Protos.Status status = Futures.getUnchecked(driverFuture).killTask(
+        Protos.TaskID.newBuilder().setValue(taskId).build());
+
+    if (status != DRIVER_RUNNING) {
+      LOG.severe(String.format("Attempt to kill task %s failed with code %s",
+          taskId, status));
+      killFailures.incrementAndGet();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6d2ac49..6663555 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -45,13 +45,13 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.SideEffect.Action;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index fe16d3a..0186484 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -19,9 +19,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 
-import org.apache.aurora.scheduler.MesosTaskFactory;
-import org.apache.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
 import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 78a9670..9c9b659 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -20,13 +20,13 @@ import javax.inject.Inject;
 
 import com.google.common.base.Optional;
 
-import org.apache.aurora.scheduler.MesosTaskFactory;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.Offer;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
index cbb711f..f4fa1cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.storage.log;
 
 import java.nio.ByteBuffer;
+
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index 855573e..cb95d89 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
+
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/DriverFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/DriverFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/DriverFactoryImplTest.java
deleted file mode 100644
index 0f61922..0000000
--- a/src/test/java/org/apache/aurora/scheduler/DriverFactoryImplTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Properties;
-
-import com.google.common.base.Throwables;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.DriverFactory.DriverFactoryImpl;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class DriverFactoryImplTest extends EasyMockTest {
-
-  @Test(expected = IllegalStateException.class)
-  public void testMissingPropertiesParsing() {
-    Properties testProperties = new Properties();
-    testProperties.put(DriverFactoryImpl.PRINCIPAL_KEY, "aurora-scheduler");
-
-    ByteArrayOutputStream propertiesStream = new ByteArrayOutputStream();
-    try {
-      testProperties.store(propertiesStream, "");
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-
-    control.replay();
-    DriverFactoryImpl.parseCredentials(new ByteArrayInputStream(propertiesStream.toByteArray()));
-  }
-
-  @Test
-  public void testPropertiesParsing() {
-    Properties testProperties = new Properties();
-    testProperties.put(DriverFactoryImpl.PRINCIPAL_KEY, "aurora-scheduler");
-    testProperties.put(DriverFactoryImpl.SECRET_KEY, "secret");
-
-    ByteArrayOutputStream propertiesStream = new ByteArrayOutputStream();
-    try {
-      testProperties.store(propertiesStream, "");
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-
-    control.replay();
-    assertEquals(testProperties,
-        DriverFactoryImpl.parseCredentials(
-            new ByteArrayInputStream(propertiesStream.toByteArray())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/DriverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/DriverTest.java b/src/test/java/org/apache/aurora/scheduler/DriverTest.java
deleted file mode 100644
index a96dd87..0000000
--- a/src/test/java/org/apache/aurora/scheduler/DriverTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.Driver.DriverImpl;
-import org.apache.mesos.Protos;
-import org.apache.mesos.SchedulerDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.mesos.Protos.Status.DRIVER_ABORTED;
-import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
-import static org.easymock.EasyMock.expect;
-
-public class DriverTest extends EasyMockTest {
-
-  private static final String TASK_1 = "1";
-  private static final String TASK_2 = "2";
-
-  private SchedulerDriver schedulerDriver;
-  private DriverImpl driver;
-
-  private static Protos.TaskID createTaskId(String taskId) {
-    return Protos.TaskID.newBuilder().setValue(taskId).build();
-  }
-
-  @Before
-  public void setUp() {
-    schedulerDriver = createMock(SchedulerDriver.class);
-    driver = new DriverImpl();
-  }
-
-  @Test
-  public void testNoopStop() {
-    control.replay();
-
-    driver.stop();
-  }
-
-  @Test
-  public void testMultipleStops() {
-    expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
-    control.replay();
-
-    driver.initialize(schedulerDriver);
-    driver.stop();
-    driver.stop();
-  }
-
-  @Test
-  public void testStop() {
-    expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
-    control.replay();
-
-    driver.initialize(schedulerDriver);
-    driver.stop();
-  }
-
-  @Test
-  public void testNormalLifecycle() {
-    expect(schedulerDriver.killTask(createTaskId(TASK_1))).andReturn(DRIVER_RUNNING);
-    expect(schedulerDriver.killTask(createTaskId(TASK_2))).andReturn(DRIVER_RUNNING);
-    expect(schedulerDriver.stop(true)).andReturn(DRIVER_ABORTED);
-    control.replay();
-
-    driver.initialize(schedulerDriver);
-    driver.killTask(TASK_1);
-    driver.killTask(TASK_2);
-    driver.stop();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testMustRunBeforeKill() {
-    control.replay();
-
-    driver.killTask(TASK_1);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testOnlyOneSetAllowed() {
-    control.replay();
-
-    driver.initialize(schedulerDriver);
-    driver.initialize(schedulerDriver);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
deleted file mode 100644
index 8dd908e..0000000
--- a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.testing.TearDown;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.TypeLiteral;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.base.Command;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.TaskStatus;
-import org.apache.mesos.SchedulerDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertTrue;
-
-public class MesosSchedulerImplTest extends EasyMockTest {
-
-  private static final String FRAMEWORK_ID = "framework-id";
-  private static final FrameworkID FRAMEWORK =
-      FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
-
-  private static final String SLAVE_HOST = "slave-hostname";
-  private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("slave-id").build();
-  private static final String SLAVE_HOST_2 = "slave-hostname-2";
-  private static final SlaveID SLAVE_ID_2 = SlaveID.newBuilder().setValue("slave-id-2").build();
-  private static final ExecutorID EXECUTOR_ID =
-      ExecutorID.newBuilder().setValue("executor-id").build();
-
-  private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
-  private static final Offer OFFER = Offer.newBuilder()
-      .setFrameworkId(FRAMEWORK)
-      .setSlaveId(SLAVE_ID)
-      .setHostname(SLAVE_HOST)
-      .setId(OFFER_ID)
-      .build();
-  private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build();
-  private static final Offer OFFER_2 = Offer.newBuilder(OFFER)
-      .setSlaveId(SLAVE_ID_2)
-      .setHostname(SLAVE_HOST_2)
-      .setId(OFFER_ID_2)
-      .build();
-
-  private static final TaskStatus STATUS = TaskStatus.newBuilder()
-      .setState(TaskState.TASK_RUNNING)
-      .setTaskId(TaskID.newBuilder().setValue("task-id").build())
-      .build();
-
-  private StorageTestUtil storageUtil;
-  private TaskLauncher systemLauncher;
-  private TaskLauncher userLauncher;
-  private SchedulerDriver driver;
-  private EventSink eventSink;
-
-  private MesosSchedulerImpl scheduler;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    final Lifecycle lifecycle =
-        new Lifecycle(createMock(Command.class), createMock(UncaughtExceptionHandler.class));
-    systemLauncher = createMock(TaskLauncher.class);
-    userLauncher = createMock(TaskLauncher.class);
-    eventSink = createMock(EventSink.class);
-
-    Injector injector = Guice.createInjector(new AbstractModule() {
-      @Override
-      protected void configure() {
-        bind(Storage.class).toInstance(storageUtil.storage);
-        bind(Lifecycle.class).toInstance(lifecycle);
-        bind(new TypeLiteral<List<TaskLauncher>>() { })
-            .toInstance(Arrays.asList(systemLauncher, userLauncher));
-        bind(EventSink.class).toInstance(eventSink);
-        bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
-            .toInstance(MoreExecutors.sameThreadExecutor());
-      }
-    });
-    scheduler = injector.getInstance(MesosSchedulerImpl.class);
-    driver = createMock(SchedulerDriver.class);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testBadOrdering() {
-    control.replay();
-
-    // Should fail since the scheduler is not yet registered.
-    scheduler.resourceOffers(driver, ImmutableList.<Offer>of());
-  }
-
-  @Test
-  public void testNoOffers() throws Exception {
-    new RegisteredFixture() {
-      @Override
-      void test() {
-        scheduler.resourceOffers(driver, ImmutableList.<Offer>of());
-      }
-    }.run();
-  }
-
-  @Test
-  public void testNoAccepts() throws Exception {
-    new OfferFixture() {
-      @Override
-      void respondToOffer() throws Exception {
-        expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.willUse(OFFER)).andReturn(false);
-        expect(userLauncher.willUse(OFFER)).andReturn(false);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testOfferFirstAccepts() throws Exception {
-    new OfferFixture() {
-      @Override
-      void respondToOffer() throws Exception {
-        expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.willUse(OFFER)).andReturn(true);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testOfferSchedulerAccepts() throws Exception {
-    new OfferFixture() {
-      @Override
-      void respondToOffer() throws Exception {
-        expectOfferAttributesSaved(OFFER);
-        expect(systemLauncher.willUse(OFFER)).andReturn(false);
-        expect(userLauncher.willUse(OFFER)).andReturn(true);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testStatusUpdateNoAccepts() throws Exception {
-    new StatusFixture() {
-      @Override
-      void expectations() throws Exception {
-        expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
-        expect(userLauncher.statusUpdate(STATUS)).andReturn(false);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testStatusUpdateFirstAccepts() throws Exception {
-    new StatusFixture() {
-      @Override
-      void expectations() throws Exception {
-        expect(systemLauncher.statusUpdate(STATUS)).andReturn(true);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testStatusUpdateSecondAccepts() throws Exception {
-    new StatusFixture() {
-      @Override
-      void expectations() throws Exception {
-        expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
-        expect(userLauncher.statusUpdate(STATUS)).andReturn(true);
-      }
-    }.run();
-  }
-
-  @Test(expected = SchedulerException.class)
-  public void testStatusUpdateFails() throws Exception {
-    new StatusFixture() {
-      @Override
-      void expectations() throws Exception {
-        expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
-        expect(userLauncher.statusUpdate(STATUS)).andThrow(new StorageException("Injected."));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testMultipleOffers() throws Exception {
-    new RegisteredFixture() {
-      @Override
-      void expectations() throws Exception {
-        expectOfferAttributesSaved(OFFER);
-        expectOfferAttributesSaved(OFFER_2);
-        expect(systemLauncher.willUse(OFFER)).andReturn(false);
-        expect(userLauncher.willUse(OFFER)).andReturn(true);
-        expect(systemLauncher.willUse(OFFER_2)).andReturn(false);
-        expect(userLauncher.willUse(OFFER_2)).andReturn(false);
-      }
-
-      @Override
-      void test() {
-        scheduler.resourceOffers(driver, ImmutableList.of(OFFER, OFFER_2));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testDisconnected() throws Exception {
-    new RegisteredFixture() {
-      @Override
-      void expectations() throws Exception {
-        eventSink.post(new DriverDisconnected());
-      }
-
-      @Override
-      void test() {
-        scheduler.disconnected(driver);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testFrameworkMessageIgnored() throws Exception {
-    control.replay();
-
-    scheduler.frameworkMessage(
-        driver,
-        EXECUTOR_ID,
-        SLAVE_ID,
-        "hello".getBytes(StandardCharsets.UTF_8));
-  }
-
-  private void expectOfferAttributesSaved(Offer offer) {
-    storageUtil.attributeStore.saveHostAttributes(Conversions.getAttributes(offer));
-  }
-
-  private abstract class RegisteredFixture {
-    private final AtomicBoolean runCalled = new AtomicBoolean(false);
-
-    RegisteredFixture() throws Exception {
-      // Prevent otherwise silent noop tests that forget to call run().
-      addTearDown(new TearDown() {
-        @Override
-        public void tearDown() {
-          assertTrue(runCalled.get());
-        }
-      });
-    }
-
-    void run() throws Exception {
-      runCalled.set(true);
-      eventSink.post(new DriverRegistered());
-      storageUtil.expectOperations();
-      storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
-      expectations();
-
-      control.replay();
-
-      scheduler.registered(driver, FRAMEWORK, MasterInfo.getDefaultInstance());
-      test();
-    }
-
-    void expectations() throws Exception {
-      // Default no-op, subclasses may override.
-    }
-
-    abstract void test();
-  }
-
-  private abstract class OfferFixture extends RegisteredFixture {
-    OfferFixture() throws Exception {
-      super();
-    }
-
-    abstract void respondToOffer() throws Exception;
-
-    @Override
-    void expectations() throws Exception {
-      respondToOffer();
-    }
-
-    @Override
-    void test() {
-      scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
-    }
-  }
-
-  private abstract class StatusFixture extends RegisteredFixture {
-    StatusFixture() throws Exception {
-      super();
-    }
-
-    @Override
-    void test() {
-      scheduler.statusUpdate(driver, STATUS);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
deleted file mode 100644
index aa026cf..0000000
--- a/src/test/java/org/apache/aurora/scheduler/MesosTaskFactoryImplTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.quantity.Data;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
-import org.apache.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.CommandInfo.URI;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class MesosTaskFactoryImplTest {
-
-  private static final String EXECUTOR_PATH = "/twitter/fake/executor.sh";
-  private static final IAssignedTask TASK = IAssignedTask.build(new AssignedTask()
-      .setInstanceId(2)
-      .setTaskId("task-id")
-      .setAssignedPorts(ImmutableMap.of("http", 80))
-      .setTask(new TaskConfig()
-          .setJob(new JobKey("role", "environment", "job-name"))
-          .setOwner(new Identity("role", "user"))
-          .setEnvironment("environment")
-          .setJobName("job-name")
-          .setDiskMb(10)
-          .setRamMb(100)
-          .setNumCpus(5)
-          .setRequestedPorts(ImmutableSet.of("http"))));
-  private static final SlaveID SLAVE = SlaveID.newBuilder().setValue("slave-id").build();
-
-  private MesosTaskFactory taskFactory;
-
-  @Before
-  public void setUp() {
-    taskFactory = new MesosTaskFactoryImpl(new ExecutorConfig(EXECUTOR_PATH));
-  }
-
-  private static final ExecutorInfo DEFAULT_EXECUTOR = ExecutorInfo.newBuilder()
-      .setExecutorId(MesosTaskFactoryImpl.getExecutorId(TASK.getTaskId()))
-      .setName(MesosTaskFactoryImpl.EXECUTOR_NAME)
-      .setSource(MesosTaskFactoryImpl.getInstanceSourceName(TASK.getTask(), TASK.getInstanceId()))
-      .addResources(Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
-      .addResources(Resources.makeMesosResource(
-          Resources.RAM_MB,
-          ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
-      .setCommand(CommandInfo.newBuilder()
-          .setValue("./executor.sh")
-          .addUris(URI.newBuilder().setValue(EXECUTOR_PATH).setExecutable(true)))
-      .build();
-
-  @Test
-  public void testExecutorInfoUnchanged() {
-    TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
-    assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
-    assertEquals(ImmutableSet.of(
-            Resources.makeMesosResource(Resources.CPUS, TASK.getTask().getNumCpus()),
-            Resources.makeMesosResource(Resources.RAM_MB, TASK.getTask().getRamMb()),
-            Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb()),
-            Resources.makeMesosRangeResource(
-                Resources.PORTS,
-                ImmutableSet.copyOf(TASK.getAssignedPorts().values()))
-        ),
-        ImmutableSet.copyOf(task.getResourcesList()));
-  }
-
-  @Test
-  public void testCreateFromPortsUnset() {
-    AssignedTask assignedTask = TASK.newBuilder();
-    assignedTask.unsetAssignedPorts();
-    TaskInfo task = taskFactory.createFrom(IAssignedTask.build(assignedTask), SLAVE);
-    assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
-    assertEquals(ImmutableSet.of(
-            Resources.makeMesosResource(Resources.CPUS, TASK.getTask().getNumCpus()),
-            Resources.makeMesosResource(Resources.RAM_MB, TASK.getTask().getRamMb()),
-            Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb())
-        ),
-        ImmutableSet.copyOf(task.getResourcesList()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index 9861601..97ecb74 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Optional;
 import com.twitter.common.application.Lifecycle;
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.base.Command;
@@ -27,16 +26,14 @@ import com.twitter.common.zookeeper.SingletonService.LeaderControl;
 import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
 
 import org.apache.aurora.GuavaUtils.ServiceManagerIface;
-import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.SchedulerDriver;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -54,12 +51,10 @@ public class SchedulerLifecycleTest extends EasyMockTest {
 
   private static final String FRAMEWORK_ID = "framework id";
 
-  private DriverFactory driverFactory;
   private StorageTestUtil storageUtil;
   private ShutdownSystem shutdownRegistry;
-  private SettableDriver driver;
+  private Driver driver;
   private LeaderControl leaderControl;
-  private SchedulerDriver schedulerDriver;
   private DelayedActions delayedActions;
   private EventSink eventSink;
   private FakeStatsProvider statsProvider;
@@ -69,12 +64,10 @@ public class SchedulerLifecycleTest extends EasyMockTest {
 
   @Before
   public void setUp() {
-    driverFactory = createMock(DriverFactory.class);
     storageUtil = new StorageTestUtil(this);
     shutdownRegistry = createMock(ShutdownSystem.class);
-    driver = createMock(SettableDriver.class);
+    driver = createMock(Driver.class);
     leaderControl = createMock(LeaderControl.class);
-    schedulerDriver = createMock(SchedulerDriver.class);
     delayedActions = createMock(DelayedActions.class);
     eventSink = createMock(EventSink.class);
     statsProvider = new FakeStatsProvider();
@@ -96,7 +89,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     control.replay();
 
     schedulerLifecycle = new SchedulerLifecycle(
-        driverFactory,
         storageUtil.storage,
         new Lifecycle(shutdownRegistry, new UncaughtExceptionHandler() {
           @Override
@@ -119,12 +111,11 @@ public class SchedulerLifecycleTest extends EasyMockTest {
   private void expectLoadStorage() {
     storageUtil.storage.start(EasyMock.<Quiet>anyObject());
     storageUtil.expectOperations();
-    expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
   }
 
   private void expectInitializeDriver() {
-    driver.initialize(schedulerDriver);
-    expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
+    expect(driver.startAsync()).andReturn(driver);
+    driver.awaitRunning();
     delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
   }
 
@@ -137,7 +128,8 @@ public class SchedulerLifecycleTest extends EasyMockTest {
 
   private void expectShutdown() throws Exception {
     leaderControl.leave();
-    driver.stop();
+    expect(driver.stopAsync()).andReturn(driver);
+    driver.awaitTerminated();
     storageUtil.storage.stop();
     shutdownRegistry.execute();
   }
@@ -150,7 +142,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
 
     storageUtil.storage.prepare();
     expectLoadStorage();
-    expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
     Capture<Runnable> triggerFailover = createCapture();
     delayedActions.onAutoFailover(capture(triggerFailover));
     delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
@@ -176,11 +167,11 @@ public class SchedulerLifecycleTest extends EasyMockTest {
   public void testRegistrationTimeout() throws Exception {
     storageUtil.storage.prepare();
     expectLoadStorage();
-    expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
     delayedActions.onAutoFailover(EasyMock.<Runnable>anyObject());
     Capture<Runnable> registrationTimeout = createCapture();
     delayedActions.onRegistrationTimeout(capture(registrationTimeout));
-    expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
+    expect(driver.startAsync()).andReturn(driver);
+    driver.awaitRunning();
 
     expectShutdown();
 
@@ -195,10 +186,10 @@ public class SchedulerLifecycleTest extends EasyMockTest {
   public void testDefeatedBeforeRegistered() throws Exception {
     storageUtil.storage.prepare();
     expectLoadStorage();
-    expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
     delayedActions.onAutoFailover(EasyMock.<Runnable>anyObject());
     delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
-    expect(schedulerDriver.start()).andReturn(Status.DRIVER_RUNNING);
+    expect(driver.startAsync()).andReturn(driver);
+    driver.awaitRunning();
 
     // Important piece here is what's absent - leader presence is not advertised.
     expectShutdown();
@@ -234,7 +225,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
   public void testExternalShutdown() throws Exception {
     storageUtil.storage.prepare();
     expectLoadStorage();
-    expect(driverFactory.apply(FRAMEWORK_ID)).andReturn(schedulerDriver);
     Capture<Runnable> triggerFailover = createCapture();
     delayedActions.onAutoFailover(capture(triggerFailover));
     delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 7137971..c903894 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -75,31 +75,39 @@ import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
-import org.apache.aurora.scheduler.DriverFactory;
-import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
 import org.apache.aurora.scheduler.log.Log.Stream;
+import org.apache.aurora.scheduler.mesos.DriverFactory;
+import org.apache.aurora.scheduler.mesos.DriverSettings;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
 import org.apache.aurora.scheduler.storage.log.EntrySerializer;
 import org.apache.aurora.scheduler.storage.log.LogStorageModule;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
 import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
 import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
+import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.MasterInfo;
 import org.apache.mesos.Protos.Status;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IMocksControl;
 import org.junit.Before;
 import org.junit.Test;
 
+import static com.twitter.common.testing.easymock.EasyMockTest.createCapture;
+
+import static org.apache.mesos.Protos.FrameworkInfo;
+import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -114,6 +122,14 @@ public class SchedulerIT extends BaseZooKeeperTest {
   private static final String STATS_URL_PREFIX = "fake_url";
   private static final String FRAMEWORK_ID = "integration_test_framework_id";
 
+  private static final DriverSettings SETTINGS = new DriverSettings(
+      "fakemaster",
+      Optional.<Protos.Credential>absent(),
+      FrameworkInfo.newBuilder()
+          .setUser("framework user")
+          .setName("test framework")
+          .build());
+
   private ExecutorService executor = Executors.newCachedThreadPool(
       new ThreadFactoryBuilder().setNameFormat("SchedulerIT-%d").setDaemon(true).build());
   private AtomicReference<Optional<RuntimeException>> mainException =
@@ -176,6 +192,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
       @Override
       protected void configure() {
         bind(DriverFactory.class).toInstance(driverFactory);
+        bind(DriverSettings.class).toInstance(SETTINGS);
         bind(Log.class).toInstance(log);
         bind(ExecutorConfig.class).toInstance(new ExecutorConfig("/executor/thermos"));
         install(new BackupModule(backupDir, SnapshotStoreImpl.class));
@@ -297,7 +314,13 @@ public class SchedulerIT extends BaseZooKeeperTest {
 
   @Test
   public void testLaunch() throws Exception {
-    expect(driverFactory.apply(null)).andReturn(driver).anyTimes();
+    Capture<Scheduler> scheduler = createCapture();
+    expect(driverFactory.create(
+        capture(scheduler),
+        eq(SETTINGS.getCredentials()),
+        eq(SETTINGS.getFrameworkInfo()),
+        eq(SETTINGS.getMasterUri())))
+        .andReturn(driver).anyTimes();
 
     ScheduledTask snapshotTask = makeTask("snapshotTask", ScheduleStatus.ASSIGNED);
     ScheduledTask transactionTask = makeTask("transactionTask", ScheduleStatus.RUNNING);
@@ -346,7 +369,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
     startScheduler();
 
     driverStarted.await();
-    injector.getInstance(Scheduler.class).registered(driver,
+    scheduler.getValue().registered(driver,
         FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
         MasterInfo.getDefaultInstance());
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java
index 98718c4..f4214fb 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeMaster.java
@@ -25,16 +25,21 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
 import org.apache.aurora.scheduler.app.local.simulator.Events.Started;
+import org.apache.aurora.scheduler.mesos.DriverFactory;
+import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.Filters;
 import org.apache.mesos.Protos.FrameworkID;
@@ -53,11 +58,13 @@ import org.apache.mesos.SchedulerDriver;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.mesos.Protos.FrameworkInfo;
+
 /**
  * A simulated master for use in scheduler testing.
  */
 @SuppressWarnings("deprecation")
-public class FakeMaster implements SchedulerDriver {
+public class FakeMaster implements SchedulerDriver, DriverFactory {
 
   private static final Logger LOG = Logger.getLogger(FakeMaster.class.getName());
 
@@ -71,12 +78,11 @@ public class FakeMaster implements SchedulerDriver {
   private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
   private final CountDownLatch stopped = new CountDownLatch(1);
 
-  private final Scheduler scheduler;
+  private final SettableFuture<Scheduler> schedulerFuture = SettableFuture.create();
   private final EventBus eventBus;
 
   @Inject
-  FakeMaster(Scheduler scheduler, EventBus eventBus) {
-    this.scheduler = requireNonNull(scheduler);
+  FakeMaster(EventBus eventBus) {
     this.eventBus = requireNonNull(eventBus);
   }
 
@@ -95,17 +101,28 @@ public class FakeMaster implements SchedulerDriver {
     assertNotStopped();
 
     checkState(activeTasks.containsKey(task), "Task " + task + " does not exist.");
-    scheduler.statusUpdate(this, TaskStatus.newBuilder()
+    Futures.getUnchecked(schedulerFuture).statusUpdate(this, TaskStatus.newBuilder()
         .setTaskId(task)
         .setState(state)
         .build());
   }
 
   @Override
+  public SchedulerDriver create(
+      Scheduler scheduler,
+      Optional<Protos.Credential> credentials,
+      FrameworkInfo frameworkInfo,
+      String master) {
+
+    schedulerFuture.set(scheduler);
+    return this;
+  }
+
+  @Override
   public Status start() {
     assertNotStopped();
 
-    scheduler.registered(this,
+    Futures.getUnchecked(schedulerFuture).registered(this,
         FrameworkID.newBuilder().setValue("local").build(),
         MasterInfo.getDefaultInstance());
 
@@ -127,7 +144,7 @@ public class FakeMaster implements SchedulerDriver {
             if (allOffers.isEmpty()) {
               LOG.info("All offers consumed, suppressing offer cycle.");
             } else {
-              scheduler.resourceOffers(FakeMaster.this, allOffers);
+              Futures.getUnchecked(schedulerFuture).resourceOffers(FakeMaster.this, allOffers);
             }
           }
         },
@@ -182,7 +199,7 @@ public class FakeMaster implements SchedulerDriver {
 
   private void checkState(boolean assertion, String failureMessage) {
     if (!assertion) {
-      scheduler.error(this, failureMessage);
+      Futures.getUnchecked(schedulerFuture).error(this, failureMessage);
       stop();
       throw new IllegalStateException(failureMessage);
     }
@@ -217,10 +234,12 @@ public class FakeMaster implements SchedulerDriver {
         new Runnable() {
           @Override
           public void run() {
-            scheduler.statusUpdate(FakeMaster.this, TaskStatus.newBuilder()
-                .setTaskId(task.getTaskId())
-                .setState(TaskState.TASK_RUNNING)
-                .build());
+            Futures.getUnchecked(schedulerFuture).statusUpdate(
+                FakeMaster.this,
+                TaskStatus.newBuilder()
+                    .setTaskId(task.getTaskId())
+                    .setState(TaskState.TASK_RUNNING)
+                    .build());
           }
         },
         1,
@@ -247,7 +266,7 @@ public class FakeMaster implements SchedulerDriver {
     checkState(task != null, "Task " + taskId + " not found.");
     idleOffers.put(task.getOffer().getId(), task.getOffer());
 
-    scheduler.statusUpdate(this, TaskStatus.newBuilder()
+    Futures.getUnchecked(schedulerFuture).statusUpdate(this, TaskStatus.newBuilder()
         .setTaskId(taskId)
         .setState(TaskState.TASK_FINISHED)
         .build());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index 43d73b1..640acdf 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -16,10 +16,9 @@ package org.apache.aurora.scheduler.app.local;
 import java.io.File;
 import java.util.List;
 
-import javax.annotation.Nullable;
-import javax.inject.Inject;
 import javax.inject.Singleton;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
 import com.google.inject.AbstractModule;
@@ -29,13 +28,15 @@ import com.twitter.common.application.AppLauncher;
 
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.DriverFactory;
 import org.apache.aurora.scheduler.app.SchedulerMain;
 import org.apache.aurora.scheduler.app.local.simulator.ClusterSimulatorModule;
+import org.apache.aurora.scheduler.mesos.DriverFactory;
+import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.log.LogStorage;
+import org.apache.mesos.Protos;
 import org.apache.mesos.SchedulerDriver;
 
 /**
@@ -43,6 +44,14 @@ import org.apache.mesos.SchedulerDriver;
  */
 public class LocalSchedulerMain extends SchedulerMain {
 
+  private static final DriverSettings DRIVER_SETTINGS = new DriverSettings(
+      "fakemaster",
+      Optional.<Protos.Credential>absent(),
+      Protos.FrameworkInfo.newBuilder()
+          .setUser("framework user")
+          .setName("test framework")
+          .build());
+
   @Override
   protected Module getPersistentStorageModule() {
     return new AbstractModule() {
@@ -65,28 +74,15 @@ public class LocalSchedulerMain extends SchedulerMain {
     return new AbstractModule() {
       @Override
       protected void configure() {
-        bind(DriverFactory.class).to(FakeDriverFactory.class);
+        bind(DriverSettings.class).toInstance(DRIVER_SETTINGS);
         bind(SchedulerDriver.class).to(FakeMaster.class);
+        bind(DriverFactory.class).to(FakeMaster.class);
         bind(FakeMaster.class).in(Singleton.class);
         install(new ClusterSimulatorModule());
       }
     };
   }
 
-  static class FakeDriverFactory implements DriverFactory {
-    private final SchedulerDriver driver;
-
-    @Inject
-    FakeDriverFactory(SchedulerDriver driver) {
-      this.driver = driver;
-    }
-
-    @Override
-    public SchedulerDriver apply(@Nullable String input) {
-      return driver;
-    }
-  }
-
   public static void main(String[] args) {
     File backupDir = Files.createTempDir();
     backupDir.deleteOnExit();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
index 62d8aab..962aff8 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
@@ -29,9 +29,9 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
index 059a276..758a8d4 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
@@ -35,11 +35,11 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.comm.AdjustRetainedTasks;
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.FrameworkID;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
index 8ad9f5c..662ebdc 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
@@ -37,10 +37,10 @@ import com.twitter.common.util.BackoffStrategy;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
index 15fb7ff..e2a198a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
@@ -30,11 +30,11 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
 import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.async.OfferQueue.LaunchException;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.TaskInfo;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
index 1d84496..51256f4 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
@@ -18,7 +18,6 @@ import java.util.concurrent.ScheduledFuture;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.RateLimiter;
-
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 1caaf14..7736d4c 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -40,7 +40,6 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
@@ -50,6 +49,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index e79327c..17295ac 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
@@ -57,7 +56,7 @@ import static org.junit.Assert.assertEquals;
 public class TaskTimeoutTest extends EasyMockTest {
 
   private static final String TASK_ID = "task_id";
-  private static final long TIMEOUT_MS = Amount.of(1L, Time.MINUTES).as(Time.MILLISECONDS);
+  private static final Amount<Long, Time> TIMEOUT = Amount.of(1L, Time.MINUTES);
 
   private AtomicLong timedOutTaskCounter;
   private ScheduledExecutorService executor;
@@ -81,25 +80,22 @@ public class TaskTimeoutTest extends EasyMockTest {
 
   private void replayAndCreate() {
     control.replay();
-    timeout = new TaskTimeout(
-        executor,
-        stateManager,
-        Amount.of(TIMEOUT_MS, Time.MILLISECONDS),
-        statsProvider);
+    timeout = new TaskTimeout(executor, stateManager, TIMEOUT, statsProvider);
+    timeout.startAsync().awaitRunning();
   }
 
-  private Capture<Runnable> expectTaskWatch(long expireMs) {
+  private Capture<Runnable> expectTaskWatch(Amount<Long, Time> expireIn) {
     Capture<Runnable> capture = createCapture();
     executor.schedule(
         EasyMock.capture(capture),
-        eq(expireMs),
-        eq(TimeUnit.MILLISECONDS));
+        eq((long) expireIn.getValue()),
+        eq(expireIn.getUnit().getTimeUnit()));
     expectLastCall().andReturn(future);
     return capture;
   }
 
   private Capture<Runnable> expectTaskWatch() {
-    return expectTaskWatch(TIMEOUT_MS);
+    return expectTaskWatch(TIMEOUT);
   }
 
   private void changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
@@ -197,17 +193,17 @@ public class TaskTimeoutTest extends EasyMockTest {
 
   @Test
   public void testStorageStart() {
-    expectTaskWatch(TIMEOUT_MS);
-    expectTaskWatch(TIMEOUT_MS);
-    expectTaskWatch(TIMEOUT_MS);
+    expectTaskWatch(TIMEOUT);
+    expectTaskWatch(TIMEOUT);
+    expectTaskWatch(TIMEOUT);
 
     replayAndCreate();
 
-    clock.setNowMillis(TIMEOUT_MS * 2);
+    clock.setNowMillis(TIMEOUT.as(Time.MILLISECONDS) * 2);
     for (IScheduledTask task : ImmutableList.of(
         makeTask("a", ASSIGNED, 0),
-        makeTask("b", KILLING, TIMEOUT_MS),
-        makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT_MS))) {
+        makeTask("b", KILLING, TIMEOUT.as(Time.MILLISECONDS)),
+        makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT.as(Time.MILLISECONDS)))) {
 
       timeout.recordStateChange(TaskStateChange.initialized(task));
     }
@@ -216,4 +212,20 @@ public class TaskTimeoutTest extends EasyMockTest {
     changeState("b", KILLING, KILLED);
     changeState("c", PREEMPTING, FINISHED);
   }
+
+  @Test
+  public void testTimeoutWhileNotStarted() throws Exception {
+    // Since the timeout is never instructed to start, it should not attempt to transition tasks,
+    // but it should try again later.
+    Capture<Runnable> assignedTimeout = expectTaskWatch();
+    expectTaskWatch(TaskTimeout.NOT_STARTED_RETRY);
+
+    control.replay();
+    timeout = new TaskTimeout(executor, stateManager, TIMEOUT, statsProvider);
+
+    changeState(INIT, PENDING);
+    changeState(PENDING, ASSIGNED);
+    assignedTimeout.getValue().run();
+    assertEquals(timedOutTaskCounter.intValue(), 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
new file mode 100644
index 0000000..9e17688
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import com.google.common.base.Throwables;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CommandLineDriverSettingsModuleTest {
+
+  @Test(expected = IllegalStateException.class)
+  public void testMissingPropertiesParsing() {
+    Properties testProperties = new Properties();
+    testProperties.put(CommandLineDriverSettingsModule.PRINCIPAL_KEY, "aurora-scheduler");
+
+    ByteArrayOutputStream propertiesStream = new ByteArrayOutputStream();
+    try {
+      testProperties.store(propertiesStream, "");
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+
+    CommandLineDriverSettingsModule.parseCredentials(
+        new ByteArrayInputStream(propertiesStream.toByteArray()));
+  }
+
+  @Test
+  public void testPropertiesParsing() {
+    Properties testProperties = new Properties();
+    testProperties.put(CommandLineDriverSettingsModule.PRINCIPAL_KEY, "aurora-scheduler");
+    testProperties.put(CommandLineDriverSettingsModule.SECRET_KEY, "secret");
+
+    ByteArrayOutputStream propertiesStream = new ByteArrayOutputStream();
+    try {
+      testProperties.store(propertiesStream, "");
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+
+    assertEquals(
+        testProperties,
+        CommandLineDriverSettingsModule.parseCredentials(
+            new ByteArrayInputStream(propertiesStream.toByteArray())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
new file mode 100644
index 0000000..af15c95
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.testing.TearDown;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.base.Command;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.base.Conversions;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.SchedulerDriver;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertTrue;
+
+public class MesosSchedulerImplTest extends EasyMockTest {
+
+  private static final String FRAMEWORK_ID = "framework-id";
+  private static final FrameworkID FRAMEWORK =
+      FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
+
+  private static final String SLAVE_HOST = "slave-hostname";
+  private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("slave-id").build();
+  private static final String SLAVE_HOST_2 = "slave-hostname-2";
+  private static final SlaveID SLAVE_ID_2 = SlaveID.newBuilder().setValue("slave-id-2").build();
+  private static final ExecutorID EXECUTOR_ID =
+      ExecutorID.newBuilder().setValue("executor-id").build();
+
+  private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
+  private static final Offer OFFER = Offer.newBuilder()
+      .setFrameworkId(FRAMEWORK)
+      .setSlaveId(SLAVE_ID)
+      .setHostname(SLAVE_HOST)
+      .setId(OFFER_ID)
+      .build();
+  private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build();
+  private static final Offer OFFER_2 = Offer.newBuilder(OFFER)
+      .setSlaveId(SLAVE_ID_2)
+      .setHostname(SLAVE_HOST_2)
+      .setId(OFFER_ID_2)
+      .build();
+
+  private static final TaskStatus STATUS = TaskStatus.newBuilder()
+      .setState(TaskState.TASK_RUNNING)
+      .setTaskId(TaskID.newBuilder().setValue("task-id").build())
+      .build();
+
+  private StorageTestUtil storageUtil;
+  private TaskLauncher systemLauncher;
+  private TaskLauncher userLauncher;
+  private SchedulerDriver driver;
+  private EventSink eventSink;
+
+  private MesosSchedulerImpl scheduler;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    final Lifecycle lifecycle =
+        new Lifecycle(createMock(Command.class), createMock(UncaughtExceptionHandler.class));
+    systemLauncher = createMock(TaskLauncher.class);
+    userLauncher = createMock(TaskLauncher.class);
+    eventSink = createMock(EventSink.class);
+
+    Injector injector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(Storage.class).toInstance(storageUtil.storage);
+        bind(Lifecycle.class).toInstance(lifecycle);
+        bind(new TypeLiteral<List<TaskLauncher>>() { })
+            .toInstance(Arrays.asList(systemLauncher, userLauncher));
+        bind(EventSink.class).toInstance(eventSink);
+        bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
+            .toInstance(MoreExecutors.sameThreadExecutor());
+      }
+    });
+    scheduler = injector.getInstance(MesosSchedulerImpl.class);
+    driver = createMock(SchedulerDriver.class);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBadOrdering() {
+    control.replay();
+
+    // Should fail since the scheduler is not yet registered.
+    scheduler.resourceOffers(driver, ImmutableList.<Offer>of());
+  }
+
+  @Test
+  public void testNoOffers() throws Exception {
+    new RegisteredFixture() {
+      @Override
+      void test() {
+        scheduler.resourceOffers(driver, ImmutableList.<Offer>of());
+      }
+    }.run();
+  }
+
+  @Test
+  public void testNoAccepts() throws Exception {
+    new OfferFixture() {
+      @Override
+      void respondToOffer() throws Exception {
+        expectOfferAttributesSaved(OFFER);
+        expect(systemLauncher.willUse(OFFER)).andReturn(false);
+        expect(userLauncher.willUse(OFFER)).andReturn(false);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testOfferFirstAccepts() throws Exception {
+    new OfferFixture() {
+      @Override
+      void respondToOffer() throws Exception {
+        expectOfferAttributesSaved(OFFER);
+        expect(systemLauncher.willUse(OFFER)).andReturn(true);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testOfferSchedulerAccepts() throws Exception {
+    new OfferFixture() {
+      @Override
+      void respondToOffer() throws Exception {
+        expectOfferAttributesSaved(OFFER);
+        expect(systemLauncher.willUse(OFFER)).andReturn(false);
+        expect(userLauncher.willUse(OFFER)).andReturn(true);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testStatusUpdateNoAccepts() throws Exception {
+    new StatusFixture() {
+      @Override
+      void expectations() throws Exception {
+        expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
+        expect(userLauncher.statusUpdate(STATUS)).andReturn(false);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testStatusUpdateFirstAccepts() throws Exception {
+    new StatusFixture() {
+      @Override
+      void expectations() throws Exception {
+        expect(systemLauncher.statusUpdate(STATUS)).andReturn(true);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testStatusUpdateSecondAccepts() throws Exception {
+    new StatusFixture() {
+      @Override
+      void expectations() throws Exception {
+        expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
+        expect(userLauncher.statusUpdate(STATUS)).andReturn(true);
+      }
+    }.run();
+  }
+
+  @Test(expected = SchedulerException.class)
+  public void testStatusUpdateFails() throws Exception {
+    new StatusFixture() {
+      @Override
+      void expectations() throws Exception {
+        expect(systemLauncher.statusUpdate(STATUS)).andReturn(false);
+        expect(userLauncher.statusUpdate(STATUS)).andThrow(new StorageException("Injected."));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testMultipleOffers() throws Exception {
+    new RegisteredFixture() {
+      @Override
+      void expectations() throws Exception {
+        expectOfferAttributesSaved(OFFER);
+        expectOfferAttributesSaved(OFFER_2);
+        expect(systemLauncher.willUse(OFFER)).andReturn(false);
+        expect(userLauncher.willUse(OFFER)).andReturn(true);
+        expect(systemLauncher.willUse(OFFER_2)).andReturn(false);
+        expect(userLauncher.willUse(OFFER_2)).andReturn(false);
+      }
+
+      @Override
+      void test() {
+        scheduler.resourceOffers(driver, ImmutableList.of(OFFER, OFFER_2));
+      }
+    }.run();
+  }
+
+  @Test
+  public void testDisconnected() throws Exception {
+    new RegisteredFixture() {
+      @Override
+      void expectations() throws Exception {
+        eventSink.post(new DriverDisconnected());
+      }
+
+      @Override
+      void test() {
+        scheduler.disconnected(driver);
+      }
+    }.run();
+  }
+
+  @Test
+  public void testFrameworkMessageIgnored() throws Exception {
+    control.replay();
+
+    scheduler.frameworkMessage(
+        driver,
+        EXECUTOR_ID,
+        SLAVE_ID,
+        "hello".getBytes(StandardCharsets.UTF_8));
+  }
+
+  private void expectOfferAttributesSaved(Offer offer) {
+    storageUtil.attributeStore.saveHostAttributes(Conversions.getAttributes(offer));
+  }
+
+  private abstract class RegisteredFixture {
+    private final AtomicBoolean runCalled = new AtomicBoolean(false);
+
+    RegisteredFixture() throws Exception {
+      // Prevent otherwise silent noop tests that forget to call run().
+      addTearDown(new TearDown() {
+        @Override
+        public void tearDown() {
+          assertTrue(runCalled.get());
+        }
+      });
+    }
+
+    void run() throws Exception {
+      runCalled.set(true);
+      eventSink.post(new DriverRegistered());
+      storageUtil.expectOperations();
+      storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
+      expectations();
+
+      control.replay();
+
+      scheduler.registered(driver, FRAMEWORK, MasterInfo.getDefaultInstance());
+      test();
+    }
+
+    void expectations() throws Exception {
+      // Default no-op, subclasses may override.
+    }
+
+    abstract void test();
+  }
+
+  private abstract class OfferFixture extends RegisteredFixture {
+    OfferFixture() throws Exception {
+      super();
+    }
+
+    abstract void respondToOffer() throws Exception;
+
+    @Override
+    void expectations() throws Exception {
+      respondToOffer();
+    }
+
+    @Override
+    void test() {
+      scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
+    }
+  }
+
+  private abstract class StatusFixture extends RegisteredFixture {
+    StatusFixture() throws Exception {
+      super();
+    }
+
+    @Override
+    void test() {
+      scheduler.statusUpdate(driver, STATUS);
+    }
+  }
+}