You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/08/03 20:47:14 UTC

[1/2] aurora git commit: Centralizing offer/task matching in TaskAssigner.

Repository: aurora
Updated Branches:
  refs/heads/master 1c0086ffc -> fb0325065


http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
deleted file mode 100644
index 9a91e63..0000000
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
+++ /dev/null
@@ -1,671 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.scheduling;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.BackoffStrategy;
-
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
-import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.offers.Offers;
-import org.apache.aurora.scheduler.preemptor.BiCache;
-import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.apache.mesos.Protos.Offer;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.isA;
-import static org.junit.Assert.assertEquals;
-
-/**
- * TODO(wfarner): Break this test up to independently test TaskSchedulerImpl and OfferQueueImpl.
- */
-public class TaskSchedulerTest extends EasyMockTest {
-
-  private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
-
-  private static final HostOffer OFFER_A =  makeOffer("OFFER_A", "HOST_A", NONE);
-  private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED);
-  private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING);
-  private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED);
-  private static final String SLAVE_A = OFFER_A.getOffer().getSlaveId().getValue();
-  private static final String SLAVE_B = OFFER_B.getOffer().getSlaveId().getValue();
-  private static final String SLAVE_C = OFFER_C.getOffer().getSlaveId().getValue();
-
-  private Storage storage;
-
-  private MaintenanceController maintenance;
-  private StateManager stateManager;
-  private TaskAssigner assigner;
-  private BackoffStrategy retryStrategy;
-  private Driver driver;
-  private ScheduledExecutorService executor;
-  private ScheduledFuture<?> future;
-  private OfferReturnDelay returnDelay;
-  private OfferManager offerManager;
-  private TaskGroups taskGroups;
-  private RescheduleCalculator rescheduleCalculator;
-  private Preemptor preemptor;
-  private BiCache<String, TaskGroupKey> reservations;
-
-  @Before
-  public void setUp() {
-    storage = DbUtil.createStorage();
-    maintenance = createMock(MaintenanceController.class);
-    stateManager = createMock(StateManager.class);
-    assigner = createMock(TaskAssigner.class);
-    retryStrategy = createMock(BackoffStrategy.class);
-    driver = createMock(Driver.class);
-    executor = createMock(ScheduledExecutorService.class);
-    future = createMock(ScheduledFuture.class);
-    returnDelay = createMock(OfferReturnDelay.class);
-    rescheduleCalculator = createMock(RescheduleCalculator.class);
-    preemptor = createMock(Preemptor.class);
-    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
-  }
-
-  private void replayAndCreateScheduler() {
-    control.replay();
-    offerManager = new OfferManagerImpl(driver, returnDelay, executor);
-    TaskScheduler scheduler = new TaskSchedulerImpl(storage,
-        stateManager,
-        assigner,
-        offerManager,
-        preemptor,
-        reservations);
-    taskGroups = new TaskGroups(
-        executor,
-        Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS),
-        retryStrategy,
-        RateLimiter.create(100),
-        scheduler,
-        rescheduleCalculator);
-  }
-
-  private Capture<Runnable> expectOffer() {
-    return expectOfferDeclineIn(10);
-  }
-
-  private Capture<Runnable> expectOfferDeclineIn(long delayMillis) {
-    expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS));
-    Capture<Runnable> runnable = createCapture();
-    executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS));
-    expectLastCall().andReturn(createMock(ScheduledFuture.class));
-    return runnable;
-  }
-
-  private void changeState(
-      IScheduledTask task,
-      ScheduleStatus oldState,
-      ScheduleStatus newState) {
-
-    final IScheduledTask copy = IScheduledTask.build(task.newBuilder().setStatus(newState));
-    // Insert the task if it doesn't already exist.
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) {
-          taskStore.saveTasks(ImmutableSet.of(copy));
-        }
-      }
-    });
-    taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState));
-  }
-
-  private Capture<Runnable> expectTaskRetryIn(long penaltyMs) {
-    Capture<Runnable> capture = createCapture();
-    executor.schedule(
-        capture(capture),
-        eq(penaltyMs),
-        eq(TimeUnit.MILLISECONDS));
-    expectLastCall().andReturn(future);
-    return capture;
-  }
-
-  private Capture<Runnable> expectTaskGroupBackoff(long previousPenaltyMs, long nextPenaltyMs) {
-    expect(retryStrategy.calculateBackoffMs(previousPenaltyMs)).andReturn(nextPenaltyMs);
-    return expectTaskRetryIn(nextPenaltyMs);
-  }
-
-  @Test
-  public void testNoTasks() {
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-    expectOfferDeclineIn(10);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-  }
-
-  @Test
-  public void testNoOffers() {
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    IScheduledTask task = createTask("a");
-    expectPreemptorCall(task.getAssignedTask());
-    expectReservationCheck(task);
-
-    replayAndCreateScheduler();
-
-    changeState(task, INIT, PENDING);
-    timeoutCapture.getValue().run();
-  }
-
-  private IScheduledTask createTask(String taskId) {
-    return createTask(taskId, null);
-  }
-
-  private IScheduledTask createTask(String taskId, @Nullable ScheduleStatus status) {
-    return setStatus(makeTask(taskId, TaskTestUtil.JOB), status);
-  }
-
-  private IScheduledTask setStatus(IScheduledTask task, @Nullable ScheduleStatus status) {
-    return IScheduledTask.build(task.newBuilder().setStatus(status));
-  }
-
-  @Test
-  public void testLoadFromStorage() {
-    final IScheduledTask a = createTask("a", KILLED);
-    final IScheduledTask b = createTask("b", PENDING);
-    final IScheduledTask c = createTask("c", RUNNING);
-
-    expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L);
-    expectTaskRetryIn(10);
-
-    replayAndCreateScheduler();
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider store) {
-        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c));
-      }
-    });
-    for (IScheduledTask task : ImmutableList.of(a, b, c)) {
-      taskGroups.taskChangedState(TaskStateChange.initialized(task));
-    }
-    changeState(c, RUNNING, FINISHED);
-  }
-
-  @Test
-  public void testTaskMissing() {
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    replayAndCreateScheduler();
-
-    taskGroups.taskChangedState(TaskStateChange.transition(createTask("a", PENDING), INIT));
-    timeoutCapture.getValue().run();
-  }
-
-  private IExpectationSetters<Assignment> expectMaybeAssign(
-      HostOffer offer,
-      IScheduledTask task,
-      AttributeAggregate jobAggregate) {
-
-    return expect(assigner.maybeAssign(
-        EasyMock.anyObject(),
-        eq(offer),
-        eq(new ResourceRequest(task.getAssignedTask().getTask(), jobAggregate)),
-        eq(Tasks.id(task))));
-  }
-
-  private IExpectationSetters<?> expectNoReservation(String slaveId) {
-    return expect(reservations.get(slaveId)).andReturn(Optional.absent());
-  }
-
-  private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
-    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
-        .andReturn(ImmutableSet.of());
-  }
-
-  @Test
-  public void testTaskAssigned() {
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-
-    IScheduledTask taskA = createTask("a", PENDING);
-    TaskInfo mesosTask = makeTaskInfo(taskA);
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectNoReservation(SLAVE_A).times(2);
-    expectReservationCheck(taskA);
-    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.failure());
-    expectPreemptorCall(taskA.getAssignedTask());
-
-    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTask));
-    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
-
-    Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    IScheduledTask taskB = createTask("b");
-    expectReservationCheck(taskB);
-    expectPreemptorCall(taskB.getAssignedTask());
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    changeState(taskA, INIT, PENDING);
-    timeoutCapture.getValue().run();
-    timeoutCapture2.getValue().run();
-
-    // Ensure the offer was consumed.
-    changeState(taskB, INIT, PENDING);
-    timeoutCapture3.getValue().run();
-  }
-
-  @Test
-  public void testDriverNotReady() {
-    IScheduledTask task = createTask("a", PENDING);
-    TaskInfo mesosTask = TaskInfo.newBuilder()
-        .setName(Tasks.id(task))
-        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
-        .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
-        .build();
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-    expectNoReservation(SLAVE_A);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
-    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
-    expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
-    expect(stateManager.changeState(
-        EasyMock.anyObject(),
-        eq("a"),
-        eq(Optional.of(PENDING)),
-        eq(LOST),
-        eq(TaskSchedulerImpl.LAUNCH_FAILED_MSG)))
-        .andReturn(StateChangeResult.SUCCESS);
-
-    replayAndCreateScheduler();
-
-    changeState(task, INIT, PENDING);
-    offerManager.addOffer(OFFER_A);
-    timeoutCapture.getValue().run();
-  }
-
-  @Test
-  public void testStorageException() {
-    IScheduledTask task = createTask("a", PENDING);
-    TaskInfo mesosTask = TaskInfo.newBuilder()
-        .setName(Tasks.id(task))
-        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
-        .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
-        .build();
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-    expectNoReservation(SLAVE_A).times(2);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure."));
-
-    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
-    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
-    expectLastCall();
-
-    replayAndCreateScheduler();
-
-    changeState(task, INIT, PENDING);
-    offerManager.addOffer(OFFER_A);
-    timeoutCapture.getValue().run();
-    timeoutCapture2.getValue().run();
-  }
-
-  @Test
-  public void testExpiration() {
-    IScheduledTask task = createTask("a", PENDING);
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
-    expectAnyMaintenanceCalls();
-    expectNoReservation(SLAVE_A);
-    expectReservationCheck(task).times(2);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
-    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectPreemptorCall(task.getAssignedTask());
-    driver.declineOffer(OFFER_A.getOffer().getId());
-    expectTaskGroupBackoff(10, 20);
-    expectPreemptorCall(task.getAssignedTask());
-
-    replayAndCreateScheduler();
-
-    changeState(task, INIT, PENDING);
-    offerManager.addOffer(OFFER_A);
-    timeoutCapture.getValue().run();
-    offerExpirationCapture.getValue().run();
-    timeoutCapture2.getValue().run();
-  }
-
-  @Test
-  public void testOneOfferPerSlave() {
-    expectAnyMaintenanceCalls();
-    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
-
-    HostOffer offerAB = new HostOffer(
-        Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(),
-        IHostAttributes.build(new HostAttributes()));
-
-    driver.declineOffer(OFFER_A.getOffer().getId());
-    driver.declineOffer(offerAB.getOffer().getId());
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(offerAB);
-    offerExpirationCapture.getValue().run();
-  }
-
-  @Test
-  public void testDontDeclineAcceptedOffer() throws OfferManager.LaunchException {
-    expectAnyMaintenanceCalls();
-    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
-
-    Function<HostOffer, Assignment> offerAcceptor =
-        createMock(new Clazz<Function<HostOffer, Assignment>>() { });
-    final TaskInfo taskInfo = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(taskInfo));
-    driver.launchTask(OFFER_A.getOffer().getId(), taskInfo);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.launchFirst(offerAcceptor, TaskGroupKey.from(ITaskConfig.build(new TaskConfig())));
-    offerExpirationCapture.getValue().run();
-  }
-
-  @Test
-  public void testBasicMaintenancePreferences() {
-    expectOffer();
-    expectOffer();
-    expectOffer();
-    expectOffer();
-
-    IScheduledTask taskA = createTask("A", PENDING);
-    TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expectNoReservation(SLAVE_A);
-    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
-    driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
-    Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    IScheduledTask taskB = createTask("B", PENDING);
-    TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expectNoReservation(SLAVE_B);
-    expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
-    driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
-    Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_D);
-    offerManager.addOffer(OFFER_C);
-    offerManager.addOffer(OFFER_B);
-    offerManager.addOffer(OFFER_A);
-
-    changeState(taskA, INIT, PENDING);
-    captureA.getValue().run();
-
-    changeState(taskB, INIT, PENDING);
-    captureB.getValue().run();
-  }
-
-  @Test
-  public void testChangingMaintenancePreferences() {
-    expectOffer();
-    expectOffer();
-    expectOffer();
-
-    IScheduledTask taskA = createTask("A", PENDING);
-    TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expectNoReservation(SLAVE_B);
-    expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
-    driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
-    Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    IScheduledTask taskB = createTask("B", PENDING);
-    TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    HostOffer updatedOfferC = new HostOffer(
-        OFFER_C.getOffer(),
-        IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
-    expectNoReservation(SLAVE_C);
-    expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
-    driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
-    Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-    offerManager.addOffer(OFFER_C);
-
-    // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable
-
-    // Expected order now (B), with (C, A) unschedulable
-    changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING);
-    changeState(taskA, INIT, PENDING);
-    captureA.getValue().run();
-
-    // Expected order now (C), with (A) unschedulable and (B) already consumed
-    changeHostMaintenanceState(OFFER_C.getAttributes(), NONE);
-    changeState(taskB, INIT, PENDING);
-    captureB.getValue().run();
-  }
-
-  private Capture<String> expectTaskScheduled(IScheduledTask task) {
-    TaskInfo mesosTask = makeTaskInfo(task);
-    Capture<String> taskId = createCapture();
-    expect(assigner.maybeAssign(
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        capture(taskId))).andReturn(Assignment.success(mesosTask));
-    driver.launchTask(EasyMock.anyObject(), eq(mesosTask));
-    return taskId;
-  }
-
-  @Test
-  public void testResistsStarvation() {
-    // TODO(wfarner): This test requires intimate knowledge of the way futures are used inside
-    // TaskScheduler.  It's time to test using a real ScheduledExecutorService.
-
-    expectAnyMaintenanceCalls();
-
-    IScheduledTask jobA0 = setStatus(makeTask("a0", JobKeys.from("a", "b", "c")), PENDING);
-
-    ScheduledTask jobA1Builder = jobA0.newBuilder();
-    jobA1Builder.getAssignedTask().setTaskId("a1");
-    jobA1Builder.getAssignedTask().setInstanceId(1);
-    IScheduledTask jobA1 = IScheduledTask.build(jobA1Builder);
-
-    ScheduledTask jobA2Builder = jobA0.newBuilder();
-    jobA2Builder.getAssignedTask().setTaskId("a2");
-    jobA2Builder.getAssignedTask().setInstanceId(2);
-    IScheduledTask jobA2 = IScheduledTask.build(jobA2Builder);
-
-    IScheduledTask jobB0 = setStatus(makeTask("b0", JobKeys.from("d", "e", "f")), PENDING);
-
-    expectNoReservation(SLAVE_A);
-    expectNoReservation(SLAVE_B);
-
-    expectOfferDeclineIn(10);
-    expectOfferDeclineIn(10);
-    expectOfferDeclineIn(10);
-    expectOfferDeclineIn(10);
-
-    Capture<Runnable> timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    Capture<Runnable> timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    Capture<String> firstScheduled = expectTaskScheduled(jobA0);
-    Capture<String> secondScheduled = expectTaskScheduled(jobB0);
-
-    // Expect another watch of the task group for job A.
-    expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-    offerManager.addOffer(OFFER_C);
-    offerManager.addOffer(OFFER_D);
-    changeState(jobA0, INIT, PENDING);
-    changeState(jobA1, INIT, PENDING);
-    changeState(jobA2, INIT, PENDING);
-    changeState(jobB0, INIT, PENDING);
-    timeoutA.getValue().run();
-    timeoutB.getValue().run();
-    assertEquals(
-        ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)),
-        ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue()));
-  }
-
-  @Test
-  public void testTaskDeleted() {
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-
-    final IScheduledTask task = createTask("a", PENDING);
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectNoReservation(SLAVE_A);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
-    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
-    expectReservationCheck(task);
-    expectPreemptorCall(task.getAssignedTask());
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    changeState(task, INIT, PENDING);
-    timeoutCapture.getValue().run();
-
-    // Ensure the offer was consumed.
-    changeState(task, INIT, PENDING);
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task));
-      }
-    });
-    taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
-    timeoutCapture.getValue().run();
-  }
-
-  private TaskInfo makeTaskInfo(IScheduledTask task) {
-    return TaskInfo.newBuilder()
-        .setName(Tasks.id(task))
-        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
-        .setSlaveId(SlaveID.newBuilder().setValue("slave-id" + task.toString()))
-        .build();
-  }
-
-  private void expectAnyMaintenanceCalls() {
-    expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes();
-  }
-
-  private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) {
-    offerManager.hostAttributesChanged(new PubsubEvent.HostAttributesChanged(
-        IHostAttributes.build(attributes.newBuilder().setMode(mode))));
-  }
-
-  private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) {
-    Offer offer = Offers.makeOffer(offerId, hostName);
-    return new HostOffer(
-        offer,
-        IHostAttributes.build(new HostAttributes()
-            .setHost(hostName)
-            .setSlaveId(offer.getSlaveId().getValue())
-            .setAttributes(ImmutableSet.of())
-            .setMode(mode)));
-  }
-
-  private void expectPreemptorCall(IAssignedTask task) {
-    expect(preemptor.attemptPreemptionFor(
-        eq(task),
-        eq(EMPTY),
-        EasyMock.anyObject())).andReturn(Optional.absent());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index f98818f..c9c6f5d 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.state;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -20,17 +21,19 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -46,11 +49,16 @@ import org.apache.mesos.Protos.Value.Type;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LAUNCH_FAILED_MSG;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class TaskAssignerImplTest extends EasyMockTest {
 
@@ -74,8 +82,10 @@ public class TaskAssignerImplTest extends EasyMockTest {
           .setAssignedTask(new AssignedTask()
               .setTaskId("taskId")
               .setTask(new TaskConfig()
+                  .setJob(new JobKey("r", "e", "n"))
                   .setExecutorConfig(new ExecutorConfig().setData("opaque data"))
                   .setRequestedPorts(ImmutableSet.of(PORT_NAME)))));
+  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
       .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
@@ -86,19 +96,23 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private StateManager stateManager;
   private SchedulingFilter filter;
   private MesosTaskFactory taskFactory;
+  private OfferManager offerManager;
   private TaskAssigner assigner;
 
   @Before
   public void setUp() throws Exception {
     storeProvider = createMock(MutableStoreProvider.class);
-    stateManager = createMock(StateManager.class);
     filter = createMock(SchedulingFilter.class);
     taskFactory = createMock(MesosTaskFactory.class);
-    assigner = new TaskAssignerImpl(stateManager, filter, taskFactory);
+    stateManager = createMock(StateManager.class);
+    offerManager = createMock(OfferManager.class);
+    assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager);
   }
 
   @Test
-  public void testAssignNoVetoes() {
+  public void testAssignNoVetoes() throws Exception {
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expect(filter.filter(
         new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
@@ -115,17 +129,18 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(
-        Assignment.success(TASK_INFO),
-        assigner.maybeAssign(
-            storeProvider,
-            OFFER,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
-            Tasks.id(TASK)));
+    assertTrue(assigner.maybeAssign(
+        storeProvider,
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+        Tasks.id(TASK),
+        Optional.of(MESOS_OFFER.getSlaveId().getValue())));
   }
 
   @Test
-  public void testAssignVetoes() {
+  public void testAssignVetoesWithStaticBan() throws Exception {
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
     expect(filter.filter(
         new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
@@ -133,12 +148,79 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(
-        Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))),
-        assigner.maybeAssign(
-            storeProvider,
-            OFFER,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
-            Tasks.id(TASK)));
+    assertFalse(assigner.maybeAssign(
+        storeProvider,
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+        Tasks.id(TASK),
+        Optional.<String>absent()));
+  }
+
+  @Test
+  public void testAssignVetoesWithNoStaticBan() throws Exception {
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    expect(filter.filter(
+        new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
+        .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
+
+    control.replay();
+
+    assertFalse(assigner.maybeAssign(
+        storeProvider,
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+        Tasks.id(TASK),
+        Optional.<String>absent()));
+  }
+
+  @Test
+  public void testAssignmentClearedOnError() throws Exception {
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
+    expect(filter.filter(
+        new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
+        .andReturn(ImmutableSet.of());
+    expect(stateManager.assignTask(
+        storeProvider,
+        Tasks.id(TASK),
+        MESOS_OFFER.getHostname(),
+        MESOS_OFFER.getSlaveId(),
+        ImmutableMap.of(PORT_NAME, PORT)))
+        .andReturn(TASK.getAssignedTask());
+    expect(stateManager.changeState(
+        storeProvider,
+        Tasks.id(TASK),
+        Optional.of(PENDING),
+        LOST,
+        LAUNCH_FAILED_MSG))
+        .andReturn(StateChangeResult.SUCCESS);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId()))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    assertFalse(assigner.maybeAssign(
+        storeProvider,
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+        Tasks.id(TASK),
+        Optional.<String>absent()));
+  }
+
+  @Test
+  public void testAssignmentSkippedForReservedSlave() throws Exception {
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+
+    control.replay();
+
+    assertFalse(assigner.maybeAssign(
+        storeProvider,
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+        Tasks.id(TASK),
+        Optional.of("invalid")));
   }
 }


[2/2] aurora git commit: Centralizing offer/task matching in TaskAssigner.

Posted by ma...@apache.org.
Centralizing offer/task matching in TaskAssigner.

Bugs closed: AURORA-1416

Reviewed at https://reviews.apache.org/r/37001/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/fb032506
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/fb032506
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/fb032506

Branch: refs/heads/master
Commit: fb032506529894628d9e0f85a0ded095c938bf49
Parents: 1c0086f
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Mon Aug 3 11:46:58 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Mon Aug 3 11:46:58 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  |   5 +-
 .../benchmark/fakes/FakeOfferManager.java       |  20 +-
 .../aurora/scheduler/offers/OfferManager.java   | 148 ++--
 .../scheduler/scheduling/SchedulingModule.java  |   2 +-
 .../scheduler/scheduling/TaskScheduler.java     | 101 +--
 .../aurora/scheduler/state/TaskAssigner.java    | 220 +++---
 .../scheduler/offers/OfferManagerImplTest.java  | 164 +++--
 .../scheduling/TaskSchedulerImplTest.java       | 216 +++---
 .../scheduler/scheduling/TaskSchedulerTest.java | 671 -------------------
 .../scheduler/state/TaskAssignerImplTest.java   | 122 +++-
 10 files changed, 487 insertions(+), 1182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 5bc73d5..d75f090 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -38,6 +38,7 @@ import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator;
 import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -117,7 +118,9 @@ public class SchedulingBenchmarks {
           new PrivateModule() {
             @Override
             protected void configure() {
-              bind(ScheduledExecutorService.class).toInstance(executor);
+              bind(ScheduledExecutorService.class)
+                  .annotatedWith(AsyncModule.AsyncExecutor.class)
+                  .toInstance(executor);
               bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
               bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
               bind(OfferManager.OfferReturnDelay.class).toInstance(

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index f413301..fbd24ea 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -13,14 +13,12 @@
  */
 package org.apache.aurora.benchmark.fakes;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.mesos.Protos;
 
 public class FakeOfferManager implements OfferManager {
@@ -30,15 +28,23 @@ public class FakeOfferManager implements OfferManager {
   }
 
   @Override
-  public void cancelOffer(Protos.OfferID offer) {
+  public void cancelOffer(Protos.OfferID offerId) {
     // no-op
   }
 
   @Override
-  public boolean launchFirst(
-      Function<HostOffer, TaskAssigner.Assignment> acceptor,
-      TaskGroupKey groupKey) throws LaunchException {
-    return false;
+  public void launchTask(Protos.OfferID offerId, Protos.TaskInfo taskInfo) throws LaunchException {
+    // no-op
+  }
+
+  @Override
+  public void banOffer(Protos.OfferID offerId, TaskGroupKey groupKey) {
+    // no-op
+  }
+
+  @Override
+  public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 14bf265..4b8a55f 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
@@ -29,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -47,8 +47,8 @@ import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 
@@ -76,22 +76,27 @@ public interface OfferManager extends EventSubscriber {
    * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
    * tasks against the offer.
    *
-   * @param offer Canceled offer.
+   * @param offerId Canceled offer.
    */
-  void cancelOffer(OfferID offer);
+  void cancelOffer(OfferID offerId);
 
   /**
-   * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}.
+   * Exclude an offer that results in a static mismatch from further attempts to match against all
+   * tasks from the same group.
    *
-   * @param acceptor Function that determines if an offer is accepted.
-   * @param groupKey Task group key.
-   * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
-   *         {@code acceptor}.
-   * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
-   *                         task.
+   * @param offerId Offer ID to exclude for the given {@code groupKey}.
+   * @param groupKey Task group key to exclude.
    */
-  boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
-      throws LaunchException;
+  void banOffer(OfferID offerId, TaskGroupKey groupKey);
+
+  /**
+   * Launches the task matched against the offer.
+   *
+   * @param offerId Matched offer ID.
+   * @param task Matched task info.
+   * @throws LaunchException If there was an error launching the task.
+   */
+  void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
 
   /**
    * Notifies the offer queue that a host's attributes have changed.
@@ -108,6 +113,14 @@ public interface OfferManager extends EventSubscriber {
   Iterable<HostOffer> getOffers();
 
   /**
+   * Gets all offers that are not statically banned for the given {@code groupKey}.
+   *
+   * @param groupKey Task group key to check offers for.
+   * @return A snapshot of all offers eligible for the given {@code groupKey}.
+   */
+  Iterable<HostOffer> getOffers(TaskGroupKey groupKey);
+
+  /**
    * Gets an offer for the given slave ID.
    *
    * @param slaveId Slave ID to get offer for.
@@ -127,7 +140,8 @@ public interface OfferManager extends EventSubscriber {
    * Thrown when there was an unexpected failure trying to launch a task.
    */
   class LaunchException extends Exception {
-    LaunchException(String msg) {
+    @VisibleForTesting
+    public LaunchException(String msg) {
       super(msg);
     }
 
@@ -218,6 +232,11 @@ public interface OfferManager extends EventSubscriber {
     }
 
     @Override
+    public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
+      return hostOffers.getWeaklyConsistentOffers(groupKey);
+    }
+
+    @Override
     public Optional<HostOffer> getOffer(SlaveID slaveId) {
       return hostOffers.get(slaveId);
     }
@@ -268,7 +287,7 @@ public interface OfferManager extends EventSubscriber {
       private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
       // TODO(maxim): Expose via a debug endpoint. AURORA-1136.
       // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
-      // scheduling attempts. See Assignment.Result for more details on static ban.
+      // scheduling attempts. See VetoGroup for more details on static ban.
       private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create();
 
       HostOffers() {
@@ -304,7 +323,7 @@ public interface OfferManager extends EventSubscriber {
         if (offer != null) {
           // Remove and re-add a host's offer to re-sort based on its new hostStatus
           remove(offer.getOffer().getId());
-          add(new HostOffer(offer.getOffer(),  attributes));
+          add(new HostOffer(offer.getOffer(), attributes));
         }
       }
 
@@ -312,27 +331,14 @@ public interface OfferManager extends EventSubscriber {
         return Iterables.unmodifiableIterable(offers);
       }
 
-      synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
-        boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey);
-        if (LOG.isLoggable(Level.FINE)) {
-          LOG.fine(String.format(
-              "Host offer %s is statically banned for %s: %s",
-              offer,
-              groupKey,
-              result));
-        }
-        return result;
+      synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) {
+        return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(
+            e -> !staticallyBannedOffers.containsEntry(e.getOffer().getId(), groupKey)));
       }
 
-      synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) {
-        OfferID offerId = offer.getOffer().getId();
+      synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) {
         if (offersById.containsKey(offerId)) {
           staticallyBannedOffers.put(offerId, groupKey);
-
-          if (LOG.isLoggable(Level.FINE)) {
-            LOG.fine(
-                String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey));
-          }
         }
       }
 
@@ -345,63 +351,31 @@ public interface OfferManager extends EventSubscriber {
       }
     }
 
-    @Timed("offer_queue_launch_first")
     @Override
-    public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
-        throws LaunchException {
-
-      // It's important that this method is not called concurrently - doing so would open up the
-      // possibility of a race between the same offers being accepted by different threads.
-
-      for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
-        if (!hostOffers.isStaticallyBanned(offer, groupKey)
-            && acceptOffer(offer, acceptor, groupKey)) {
-          return true;
-        }
-      }
-
-      return false;
+    public void banOffer(OfferID offerId, TaskGroupKey groupKey) {
+      hostOffers.addStaticGroupBan(offerId, groupKey);
     }
 
-    @Timed("offer_queue_accept_offer")
-    protected boolean acceptOffer(
-        HostOffer offer,
-        Function<HostOffer, Assignment> acceptor,
-        TaskGroupKey groupKey) throws LaunchException {
-
-      Assignment assignment = acceptor.apply(offer);
-      switch (assignment.getResult()) {
-
-        case SUCCESS:
-          // Guard against an offer being removed after we grabbed it from the iterator.
-          // If that happens, the offer will not exist in hostOffers, and we can immediately
-          // send it back to LOST for quick reschedule.
-          // Removing while iterating counts on the use of a weakly-consistent iterator being used,
-          // which is a feature of ConcurrentSkipListSet.
-          if (hostOffers.remove(offer.getOffer().getId())) {
-            try {
-              driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get());
-              return true;
-            } catch (IllegalStateException e) {
-              // TODO(William Farner): Catch only the checked exception produced by Driver
-              // once it changes from throwing IllegalStateException when the driver is not yet
-              // registered.
-              throw new LaunchException("Failed to launch task.", e);
-            }
-          } else {
-            offerRaces.incrementAndGet();
-            throw new LaunchException(
-                "Accepted offer no longer exists in offer queue, likely data race.");
-          }
-
-        case FAILURE_STATIC_MISMATCH:
-          // Exclude an offer that results in a static mismatch from further attempts to match
-          // against all tasks from the same group.
-          hostOffers.addStaticGroupBan(offer, groupKey);
-          return false;
-
-        default:
-          return false;
+    @Timed("offer_manager_launch_task")
+    @Override
+    public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException {
+      // Guard against an offer being removed after we grabbed it from the iterator.
+      // If that happens, the offer will not exist in hostOffers, and we can immediately
+      // send it back to LOST for quick reschedule.
+      // Removing while iterating counts on the use of a weakly-consistent iterator being used,
+      // which is a feature of ConcurrentSkipListSet.
+      if (hostOffers.remove(offerId)) {
+        try {
+          driver.launchTask(offerId, task);
+        } catch (IllegalStateException e) {
+          // TODO(William Farner): Catch only the checked exception produced by Driver
+          // once it changes from throwing IllegalStateException when the driver is not yet
+          // registered.
+          throw new LaunchException("Failed to launch task.", e);
+        }
+      } else {
+        offerRaces.incrementAndGet();
+        throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index c7a1a46..b9dccc6 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -112,7 +112,7 @@ public class SchedulingModule extends AbstractModule {
     install(new PrivateModule() {
       @Override
       protected void configure() {
-        bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class);
+        bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).in(Singleton.class);
         bind(BiCache.BiCacheSettings.class).toInstance(
             new BiCache.BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size"));
         bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index d4bd529..0f0bfca 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -23,26 +23,21 @@ import javax.inject.Inject;
 import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.stats.Stats;
 
-import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -56,7 +51,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 
 /**
@@ -91,11 +85,9 @@ public interface TaskScheduler extends EventSubscriber {
     private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
 
     private final Storage storage;
-    private final StateManager stateManager;
     private final TaskAssigner assigner;
-    private final OfferManager offerManager;
     private final Preemptor preemptor;
-    private final BiCache<String, TaskGroupKey> reservations;
+    private final BiCache<TaskGroupKey, String> reservations;
 
     private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
     private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
@@ -104,54 +96,16 @@ public interface TaskScheduler extends EventSubscriber {
     @Inject
     TaskSchedulerImpl(
         Storage storage,
-        StateManager stateManager,
         TaskAssigner assigner,
-        OfferManager offerManager,
         Preemptor preemptor,
-        BiCache<String, TaskGroupKey> reservations) {
+        BiCache<TaskGroupKey, String> reservations) {
 
       this.storage = requireNonNull(storage);
-      this.stateManager = requireNonNull(stateManager);
       this.assigner = requireNonNull(assigner);
-      this.offerManager = requireNonNull(offerManager);
       this.preemptor = requireNonNull(preemptor);
       this.reservations = requireNonNull(reservations);
     }
 
-    private Function<HostOffer, Assignment> getAssignerFunction(
-        final MutableStoreProvider storeProvider,
-        final ResourceRequest resourceRequest,
-        final String taskId) {
-
-      // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
-      // and perform the assignment at the very end.  This will allow us to use optimistic locking
-      // at the top of the stack and avoid holding the write lock for too long.
-      return new Function<HostOffer, Assignment>() {
-        @Override
-        public Assignment apply(HostOffer offer) {
-          Optional<TaskGroupKey> reservation =
-              reservations.get(offer.getOffer().getSlaveId().getValue());
-
-          if (reservation.isPresent()) {
-            if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) {
-              // Slave is reserved to satisfy this task group.
-              return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
-            } else {
-              // Slave is reserved for another task.
-              return Assignment.failure();
-            }
-          } else {
-            // Slave is not reserved.
-            return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
-          }
-        }
-      };
-    }
-
-    @VisibleForTesting
-    static final Optional<String> LAUNCH_FAILED_MSG =
-        Optional.of("Unknown exception attempting to schedule task.");
-
     @Timed("task_schedule_attempt")
     @Override
     public boolean schedule(final String taskId) {
@@ -186,35 +140,22 @@ public interface TaskScheduler extends EventSubscriber {
       } else {
         ITaskConfig task = assignedTask.getTask();
         AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
-        try {
-          boolean launched = offerManager.launchFirst(
-              getAssignerFunction(store, new ResourceRequest(task, aggregate), taskId),
-              TaskGroupKey.from(task));
-
-          if (!launched) {
-            // Task could not be scheduled.
-            // TODO(maxim): Now that preemption slots are searched asynchronously, consider
-            // retrying a launch attempt within the current scheduling round IFF a reservation is
-            // available.
-            maybePreemptFor(assignedTask, aggregate, store);
-            attemptsNoMatch.incrementAndGet();
-            return false;
-          }
-        } catch (OfferManager.LaunchException e) {
-          LOG.log(Level.WARNING, "Failed to launch task.", e);
-          attemptsFailed.incrementAndGet();
-
-          // The attempt to schedule the task failed, so we need to backpedal on the
-          // assignment.
-          // It is in the LOST state and a new task will move to PENDING to replace it.
-          // Should the state change fail due to storage issues, that's okay.  The task will
-          // time out in the ASSIGNED state and be moved to LOST.
-          stateManager.changeState(
-              store,
-              taskId,
-              Optional.of(PENDING),
-              LOST,
-              LAUNCH_FAILED_MSG);
+
+        boolean launched = assigner.maybeAssign(
+            store,
+            new ResourceRequest(task, aggregate),
+            TaskGroupKey.from(task),
+            taskId,
+            reservations.get(TaskGroupKey.from(task)));
+
+        if (!launched) {
+          // Task could not be scheduled.
+          // TODO(maxim): Now that preemption slots are searched asynchronously, consider
+          // retrying a launch attempt within the current scheduling round IFF a reservation is
+          // available.
+          maybePreemptFor(assignedTask, aggregate, store);
+          attemptsNoMatch.incrementAndGet();
+          return false;
         }
       }
 
@@ -226,12 +167,12 @@ public interface TaskScheduler extends EventSubscriber {
         AttributeAggregate jobState,
         MutableStoreProvider storeProvider) {
 
-      if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
+      if (reservations.get(TaskGroupKey.from(task.getTask())).isPresent()) {
         return;
       }
       Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
       if (slaveId.isPresent()) {
-        reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
+        reservations.put(TaskGroupKey.from(task.getTask()), slaveId.get());
       }
     }
 
@@ -240,7 +181,7 @@ public interface TaskScheduler extends EventSubscriber {
       if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
         IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
         if (assigned.getSlaveId() != null) {
-          reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
+          reservations.remove(TaskGroupKey.from(assigned.getTask()), assigned.getSlaveId());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 3acb45a..0e32990 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -16,20 +16,22 @@ package org.apache.aurora.scheduler.state;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -37,161 +39,64 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.mesos.Protos.TaskInfo;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.mesos.Protos.Offer;
 
 /**
- * Responsible for matching a task against an offer.
+ * Responsible for matching a task against an offer and launching it.
  */
 public interface TaskAssigner {
-
-  final class Assignment {
-
-    public enum Result {
-      /**
-       * Assignment successful.
-       */
-      SUCCESS,
-
-      /**
-       * Assignment failed.
-       */
-      FAILURE,
-
-      /**
-       * Assignment failed with static mismatch (i.e. all {@link Veto} instances group
-       * as {@link VetoGroup}).
-       * @see VetoGroup#STATIC
-       */
-      FAILURE_STATIC_MISMATCH,
-    }
-
-    private static final Optional<TaskInfo> NO_TASK_INFO = Optional.absent();
-    private static final ImmutableSet<Veto> NO_VETOES = ImmutableSet.of();
-    private final Optional<TaskInfo> taskInfo;
-    private final Set<Veto> vetoes;
-
-    private Assignment(Optional<TaskInfo> taskInfo, Set<Veto> vetoes) {
-      this.taskInfo = taskInfo;
-      this.vetoes = vetoes;
-    }
-
-    /**
-     * Creates a successful assignment instance.
-     *
-     * @param taskInfo {@link TaskInfo} to launch.
-     * @return A successful {@link Assignment}.
-     */
-    public static Assignment success(TaskInfo taskInfo) {
-      return new Assignment(Optional.of(taskInfo), NO_VETOES);
-    }
-
-    /**
-     * Creates a failed assignment instance with a set of {@link Veto} applied.
-     *
-     * @param vetoes Set of {@link Veto} instances issued for the failed offer/task match.
-     * @return A failed {@link Assignment}.
-     */
-    public static Assignment failure(Set<Veto> vetoes) {
-      return new Assignment(NO_TASK_INFO, MorePreconditions.checkNotBlank(vetoes));
-    }
-
-    /**
-     * Creates a failed assignment instance.
-     *
-     * @return A failed {@link Assignment}.
-     */
-    public static Assignment failure() {
-      return new Assignment(NO_TASK_INFO, NO_VETOES);
-    }
-
-    /**
-     * Generates the {@link Result} based on the assignment details.
-     *
-     * @return An assignment {@link Result}.
-     */
-    public Result getResult() {
-      if (taskInfo.isPresent()) {
-        return Result.SUCCESS;
-      }
-
-      return Veto.identifyGroup(vetoes) == VetoGroup.STATIC
-          ? Result.FAILURE_STATIC_MISMATCH
-          : Result.FAILURE;
-    }
-
-    /**
-     * A {@link TaskInfo} to launch.
-     *
-     * @return Optional of {@link TaskInfo}.
-     */
-    public Optional<TaskInfo> getTaskInfo() {
-      return taskInfo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof Assignment)) {
-        return false;
-      }
-
-      Assignment other = (Assignment) o;
-
-      return Objects.equal(taskInfo, other.taskInfo)
-          && Objects.equal(vetoes, other.vetoes);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(taskInfo, vetoes);
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-          .add("taskInfo", taskInfo)
-          .add("vetoes", vetoes)
-          .toString();
-    }
-  }
-
   /**
-   * Tries to match a task against an offer.  If a match is found, the assigner should
-   * make the appropriate changes to the task and provide an {@link Assignment} result.
+   * Tries to match a task against an offer.  If a match is found, the assigner makes the
+   * appropriate changes to the task and requests task launch.
    *
    * @param storeProvider Storage provider.
-   * @param offer The resource offer.
    * @param resourceRequest The request for resources being scheduled.
+   * @param groupKey Task group key.
    * @param taskId Task id to assign.
-   * @return {@link Assignment} with assignment result.
+   * @param slaveReservation Slave reservation for a given {@code groupKey}.
+   * @return Assignment result.
    */
-  Assignment maybeAssign(
+  boolean maybeAssign(
       MutableStoreProvider storeProvider,
-      HostOffer offer,
       ResourceRequest resourceRequest,
-      String taskId);
+      TaskGroupKey groupKey,
+      String taskId,
+      Optional<String> slaveReservation);
 
   class TaskAssignerImpl implements TaskAssigner {
     private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
 
+    @VisibleForTesting
+    static final Optional<String> LAUNCH_FAILED_MSG =
+        Optional.of("Unknown exception attempting to schedule task.");
+
+    private final AtomicLong launchFailures = Stats.exportLong("assigner_launch_failures");
+
     private final StateManager stateManager;
     private final SchedulingFilter filter;
     private final MesosTaskFactory taskFactory;
+    private final OfferManager offerManager;
 
     @Inject
     public TaskAssignerImpl(
         StateManager stateManager,
         SchedulingFilter filter,
-        MesosTaskFactory taskFactory) {
+        MesosTaskFactory taskFactory,
+        OfferManager offerManager) {
 
       this.stateManager = requireNonNull(stateManager);
       this.filter = requireNonNull(filter);
       this.taskFactory = requireNonNull(taskFactory);
+      this.offerManager = requireNonNull(offerManager);
     }
 
     private TaskInfo assign(
@@ -225,26 +130,61 @@ public interface TaskAssigner {
     }
 
     @Override
-    public Assignment maybeAssign(
+    public boolean maybeAssign(
         MutableStoreProvider storeProvider,
-        HostOffer offer,
         ResourceRequest resourceRequest,
-        String taskId) {
-
-      Set<Veto> vetoes = filter.filter(
-          new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
-          resourceRequest);
-      if (vetoes.isEmpty()) {
-        return Assignment.success(assign(
-            storeProvider,
-            offer.getOffer(),
-            resourceRequest.getRequestedPorts(),
-            taskId));
-      } else {
-        LOG.fine("Slave " + offer.getOffer().getHostname()
-            + " vetoed task " + taskId + ": " + vetoes);
-        return Assignment.failure(vetoes);
+        TaskGroupKey groupKey,
+        String taskId,
+        Optional<String> slaveReservation) {
+
+      for (HostOffer offer : offerManager.getOffers(groupKey)) {
+        if (slaveReservation.isPresent()
+            && !slaveReservation.get().equals(offer.getOffer().getSlaveId().getValue())) {
+          // Task group has a slave reserved but this offer is for a different slave -> skip.
+          continue;
+        }
+        Set<Veto> vetoes = filter.filter(
+            new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
+            resourceRequest);
+        if (vetoes.isEmpty()) {
+          TaskInfo taskInfo = assign(
+              storeProvider,
+              offer.getOffer(),
+              resourceRequest.getRequestedPorts(),
+              taskId);
+
+          try {
+            offerManager.launchTask(offer.getOffer().getId(), taskInfo);
+            return true;
+          } catch (OfferManager.LaunchException e) {
+            LOG.log(Level.WARNING, "Failed to launch task.", e);
+            launchFailures.incrementAndGet();
+
+            // The attempt to schedule the task failed, so we need to backpedal on the
+            // assignment.
+            // It is in the LOST state and a new task will move to PENDING to replace it.
+            // Should the state change fail due to storage issues, that's okay.  The task will
+            // time out in the ASSIGNED state and be moved to LOST.
+            stateManager.changeState(
+                storeProvider,
+                taskId,
+                Optional.of(PENDING),
+                LOST,
+                LAUNCH_FAILED_MSG);
+            return false;
+          }
+        } else {
+          if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
+            // Never attempt to match this offer/groupKey pair again.
+            offerManager.banOffer(offer.getOffer().getId(), groupKey);
+          }
+
+          LOG.fine("Slave " + offer.getOffer().getHostname()
+              + " vetoed task " + taskId + ": " + vetoes);
+          return false;
+        }
       }
+      return false;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 04be32e..088a4a6 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -16,7 +16,7 @@ package org.apache.aurora.scheduler.offers;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 
-import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.testing.TearDown;
@@ -31,32 +31,34 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
 import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.TaskInfo;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class OfferManagerImplTest extends EasyMockTest {
 
   private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS);
   private static final String HOST_A = "HOST_A";
+  private static final IHostAttributes HOST_ATTRIBUTES_A =
+      IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_A));
   private static final HostOffer OFFER_A = new HostOffer(
       Offers.makeOffer("OFFER_A", HOST_A),
-      IHostAttributes.build(new HostAttributes().setMode(NONE)));
+      HOST_ATTRIBUTES_A);
+  private static final Protos.OfferID OFFER_A_ID = OFFER_A.getOffer().getId();
   private static final String HOST_B = "HOST_B";
   private static final HostOffer OFFER_B = new HostOffer(
       Offers.makeOffer("OFFER_B", HOST_B),
@@ -67,10 +69,10 @@ public class OfferManagerImplTest extends EasyMockTest {
       IHostAttributes.build(new HostAttributes().setMode(NONE)));
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(
       ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name"))));
+  private static final TaskInfo TASK_INFO = TaskInfo.getDefaultInstance();
 
   private Driver driver;
   private FakeScheduledExecutor clock;
-  private Function<HostOffer, Assignment> offerAcceptor;
   private OfferManagerImpl offerManager;
 
   @Before
@@ -92,7 +94,6 @@ public class OfferManagerImplTest extends EasyMockTest {
         clock.assertEmpty();
       }
     });
-    offerAcceptor = createMock(new Clazz<Function<HostOffer, Assignment>>() { });
     OfferReturnDelay returnDelay = new OfferReturnDelay() {
       @Override
       public Amount<Long, Time> get() {
@@ -109,11 +110,9 @@ public class OfferManagerImplTest extends EasyMockTest {
     HostOffer offerA = setMode(OFFER_A, DRAINING);
     HostOffer offerC = setMode(OFFER_C, DRAINING);
 
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_B.getOffer().getId(), task);
+    driver.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
 
-    driver.declineOffer(offerA.getOffer().getId());
+    driver.declineOffer(OFFER_A_ID);
     driver.declineOffer(offerC.getOffer().getId());
 
     control.replay();
@@ -121,98 +120,165 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.addOffer(offerA);
     offerManager.addOffer(OFFER_B);
     offerManager.addOffer(offerC);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(
+        ImmutableSet.of(OFFER_B, offerA, offerC),
+        ImmutableSet.copyOf(offerManager.getOffers()));
+    offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
     clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void testGetOffersReturnsAllOffers() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+  public void hostAttributeChangeUpdatesOfferSorting() throws Exception {
+    driver.declineOffer(OFFER_A_ID);
+    driver.declineOffer(OFFER_B.getOffer().getId());
 
     control.replay();
 
+    offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A));
+
     offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    offerManager.addOffer(OFFER_B);
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
 
-    offerManager.cancelOffer(OFFER_A.getOffer().getId());
-    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
+    HostOffer offerA = setMode(OFFER_A, DRAINING);
+    offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
+    assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getOffers()));
+
+    offerA = setMode(OFFER_A, NONE);
+    HostOffer offerB = setMode(OFFER_B, DRAINING);
+    offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
+    offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes()));
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
 
     clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void testOfferFilteringDueToStaticBan() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+  public void testAddSameSlaveOffer() {
+    driver.declineOffer(OFFER_A_ID);
+    expectLastCall().times(2);
 
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_B.getOffer().getId(), task);
+    control.replay();
 
-    driver.declineOffer(OFFER_A.getOffer().getId());
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(OFFER_A);
+
+    clock.advance(RETURN_DELAY);
+  }
 
+  @Test
+  public void testGetOffersReturnsAllOffers() throws Exception {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-    // Run again to make sure all offers are banned (via no expectations set).
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
 
-    // Add a new offer to accept the task previously banned for OFFER_A.
-    offerManager.addOffer(OFFER_B);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    offerManager.cancelOffer(OFFER_A_ID);
+    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
 
     clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void testStaticBanIsCleared() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100))));
+  public void testOfferFilteringDueToStaticBan() throws Exception {
+    driver.declineOffer(OFFER_A_ID);
 
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_A.getOffer().getId(), task);
+    control.replay();
 
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining"))));
+    // Static ban ignored when now offers.
+    offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+    offerManager.addOffer(OFFER_A);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
 
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_A.getOffer().getId(), task);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
 
-    driver.declineOffer(OFFER_A.getOffer().getId());
+    // Add static ban.
+    offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testStaticBanIsClearedOnOfferReturn() throws Exception {
+    driver.declineOffer(OFFER_A_ID);
+    expectLastCall().times(2);
 
     control.replay();
 
     offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
 
     // Make sure the static ban is cleared when the offers are returned.
     clock.advance(RETURN_DELAY);
     offerManager.addOffer(OFFER_A);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testStaticBanIsClearedOnDriverDisconnect() throws Exception {
+    driver.declineOffer(OFFER_A_ID);
+
+    control.replay();
 
     offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
 
     // Make sure the static ban is cleared when driver is disconnected.
     offerManager.driverDisconnected(new DriverDisconnected());
     offerManager.addOffer(OFFER_A);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
 
     clock.advance(RETURN_DELAY);
   }
 
   @Test
+  public void getOffer() {
+    driver.declineOffer(OFFER_A_ID);
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getSlaveId()));
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test(expected = OfferManager.LaunchException.class)
+  public void testLaunchTaskDriverThrows() throws OfferManager.LaunchException {
+    driver.launchTask(OFFER_A_ID, TASK_INFO);
+    expectLastCall().andThrow(new IllegalStateException());
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+
+    try {
+      offerManager.launchTask(OFFER_A_ID, TASK_INFO);
+    } finally {
+      clock.advance(RETURN_DELAY);
+    }
+  }
+
+  @Test(expected = OfferManager.LaunchException.class)
+  public void testLaunchTaskOfferRaceThrows() throws OfferManager.LaunchException {
+    control.replay();
+    offerManager.launchTask(OFFER_A_ID, TASK_INFO);
+  }
+
+  @Test
   public void testFlushOffers() throws Exception {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
     offerManager.addOffer(OFFER_B);
     offerManager.driverDisconnected(new DriverDisconnected());
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
     clock.advance(RETURN_DELAY);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index a2e2d4c..350ec6f 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -24,10 +24,7 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -37,27 +34,19 @@ import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.Offers;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
 import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
@@ -67,10 +56,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -78,34 +65,22 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private static final IScheduledTask TASK_A =
       TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a"));
-  private static final IScheduledTask TASK_B =
-      TaskTestUtil.makeTask("b", JobKeys.from("b", "b", "b"));
-  private static final HostOffer OFFER = new HostOffer(
-      Offers.makeOffer("OFFER_A", "HOST_A"),
-      IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
-
-  private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue();
-
-  private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
-  private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask());
+  private static final String SLAVE_ID = "HOST_A";
+  private static final Optional<String> NO_RESERVATION = Optional.absent();
 
   private StorageTestUtil storageUtil;
-  private StateManager stateManager;
   private TaskAssigner assigner;
-  private OfferManager offerManager;
   private TaskScheduler scheduler;
   private Preemptor preemptor;
-  private BiCache<String, TaskGroupKey> reservations;
+  private BiCache<TaskGroupKey, String> reservations;
   private EventSink eventSink;
 
   @Before
   public void setUp() throws Exception {
     storageUtil = new StorageTestUtil(this);
-    stateManager = createMock(StateManager.class);
     assigner = createMock(TaskAssigner.class);
-    offerManager = createMock(OfferManager.class);
     preemptor = createMock(Preemptor.class);
-    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
+    reservations = createMock(new Clazz<BiCache<TaskGroupKey, String>>() { });
 
     Injector injector = getInjector(storageUtil.storage);
     scheduler = injector.getInstance(TaskScheduler.class);
@@ -118,11 +93,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         new AbstractModule() {
           @Override
           protected void configure() {
-            bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
+            bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).toInstance(reservations);
             bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
             bind(Preemptor.class).toInstance(preemptor);
-            bind(OfferManager.class).toInstance(offerManager);
-            bind(StateManager.class).toInstance(stateManager);
             bind(TaskAssigner.class).toInstance(assigner);
             bind(Clock.class).toInstance(createMock(Clock.class));
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
@@ -138,89 +111,100 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         ImmutableSet.of(task));
   }
 
-  private void expectAssigned(IScheduledTask task) {
-    expect(assigner.maybeAssign(
+  private IExpectationSetters<Boolean> expectAssigned(
+      IScheduledTask task,
+      Optional<String> reservation) {
+
+    return expect(assigner.maybeAssign(
         storageUtil.mutableStoreProvider,
-        OFFER,
         new ResourceRequest(task.getAssignedTask().getTask(), EMPTY),
-        Tasks.id(task))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+        TaskGroupKey.from(task.getAssignedTask().getTask()),
+        Tasks.id(task),
+        reservation));
+  }
+
+  @Test
+  public void testSchedule() throws Exception {
+    storageUtil.expectOperations();
+
+    expectReservationCheck(TASK_A);
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(true);
+
+    control.replay();
+
+    assertTrue(scheduler.schedule("a"));
+  }
+
+  @Test
+  public void testScheduleNoTask() throws Exception {
+    storageUtil.expectOperations();
+    storageUtil.expectTaskFetch(
+        Query.taskScoped(Tasks.id(TASK_A)).byStatus(PENDING),
+        ImmutableSet.of());
+
+    control.replay();
+
+    assertTrue(scheduler.schedule("a"));
   }
 
   @Test
   public void testReservation() throws Exception {
     storageUtil.expectOperations();
 
+    // No reservation available in preemptor
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectLaunchAttempt(false);
-    // Reserve "a" with offerA
-    expectReservationCheck(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectReservationCheck(TASK_A).times(2);
+    expectPreemptorCall(TASK_A, NO_RESERVATION);
+
+    // Slave is reserved.
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectReservationCheck(TASK_A).times(2);
     expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
-    expectAddReservation(SLAVE_ID, TASK_A);
+    expectAddReservation(TASK_A, SLAVE_ID);
 
     // Use previously created reservation.
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectGetReservation(SLAVE_ID, TASK_A);
-    expectAssigned(TASK_A);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
+    expectGetReservation(TASK_A, SLAVE_ID);
+    expectAssigned(TASK_A, Optional.of(SLAVE_ID)).andReturn(true);
 
     control.replay();
 
     assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule("a"));
     assertTrue(scheduler.schedule("a"));
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
   }
 
   @Test
-  public void testReservationExpires() throws Exception {
+  public void testReservationUnusable() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectLaunchAttempt(false);
-    // Reserve "a" with offerA
     expectReservationCheck(TASK_A);
-    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
-    expectAddReservation(SLAVE_ID, TASK_A);
-
-    // First attempt -> reservation is active.
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture firstAssignment = expectLaunchAttempt(false);
-    expectGetReservation(SLAVE_ID, TASK_A);
-    expectReservationCheck(TASK_B);
-    expectPreemptorCall(TASK_B, Optional.absent());
-
-    // Status changed -> reservation removed.
-    reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
-
-    // Second attempt -> reservation expires.
-    expectGetNoReservation(SLAVE_ID);
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
-    expectAssigned(TASK_B);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectGetReservation(TASK_A, SLAVE_ID);
 
     control.replay();
 
     assertFalse(scheduler.schedule("a"));
-    assertFalse(scheduler.schedule("b"));
-    assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment);
-
-    eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING));
-    assertTrue(scheduler.schedule("b"));
-    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
   }
 
   @Test
-  public void testReservationUnusable() throws Exception {
+  public void testReservationRemoved() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
-    expectLaunchAttempt(false);
-    expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask())))
-        .andReturn(ImmutableSet.of(SLAVE_ID));
+    expectActiveJobFetch(TASK_A);
+    expectReservationCheck(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectGetReservation(TASK_A, SLAVE_ID);
 
     control.replay();
 
@@ -236,17 +220,18 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   @Test
   public void testPendingDeletedHandled() throws Exception {
+    reservations.remove(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()), SLAVE_ID);
+
     control.replay();
 
-    IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING));
-    eventSink.post(TaskStateChange.transition(task, PENDING));
+    ScheduledTask taskBuilder = TASK_A.newBuilder().setStatus(PENDING);
+    taskBuilder.getAssignedTask().setSlaveId(SLAVE_ID);
+    eventSink.post(TaskStateChange.transition(IScheduledTask.build(taskBuilder), PENDING));
   }
 
   @Test
   public void testIgnoresThrottledTasks() throws Exception {
-    // Ensures that tasks in THROTTLED state are not considered part of the active job state passed
-    // to the assigner function.
-
+    // Ensures that tasks in THROTTLED state are not considered part of the active job state.
     Storage memStorage = DbUtil.createStorage();
 
     Injector injector = getInjector(memStorage);
@@ -265,23 +250,31 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       }
     });
 
-    expectGetNoReservation(SLAVE_ID);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
+    expectReservationCheck(TASK_A);
     expect(assigner.maybeAssign(
         EasyMock.anyObject(),
-        eq(OFFER),
         eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)),
-        eq(Tasks.id(taskA)))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+        eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())),
+        eq(Tasks.id(taskA)),
+        eq(NO_RESERVATION))).andReturn(true);
 
     control.replay();
 
     assertTrue(scheduler.schedule(Tasks.id(taskA)));
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
   }
 
-  private static class AssignmentCapture {
-    public Capture<Function<HostOffer, Assignment>> assigner = createCapture();
-    public Capture<TaskGroupKey> groupKey = createCapture();
+  @Test
+  public void testScheduleThrows() throws Exception {
+    storageUtil.expectOperations();
+
+    expectReservationCheck(TASK_A);
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andThrow(new IllegalArgumentException("expected"));
+
+    control.replay();
+
+    assertFalse(scheduler.schedule("a"));
   }
 
   private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
@@ -291,31 +284,6 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         storageUtil.mutableStoreProvider)).andReturn(result);
   }
 
-  private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
-      throws OfferManager.LaunchException {
-
-    AssignmentCapture capture = new AssignmentCapture();
-    expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey)))
-        .andReturn(taskLaunched);
-    return capture;
-  }
-
-  private IScheduledTask assign(IScheduledTask task, String slaveId) {
-    ScheduledTask result = task.newBuilder();
-    result.getAssignedTask().setSlaveId(slaveId);
-    return IScheduledTask.build(result);
-  }
-
-  private void assignAndAssert(
-      Result result,
-      TaskGroupKey groupKey,
-      HostOffer offer,
-      AssignmentCapture capture) {
-
-    assertEquals(result, capture.assigner.getValue().apply(offer).getResult());
-    assertEquals(groupKey, capture.groupKey.getValue());
-  }
-
   private void expectActiveJobFetch(IScheduledTask task) {
     storageUtil.expectTaskFetch(
         Query.jobScoped(((Function<IScheduledTask, IJobKey>) Tasks::getJob).apply(task))
@@ -323,21 +291,17 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         ImmutableSet.of());
   }
 
-  private void expectAddReservation(String slaveId, IScheduledTask task) {
-    reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask()));
-  }
-
-  private IExpectationSetters<?> expectGetReservation(String slaveId, IScheduledTask task) {
-    return expect(reservations.get(slaveId))
-        .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask())));
+  private void expectAddReservation(IScheduledTask task, String slaveId) {
+    reservations.put(TaskGroupKey.from(task.getAssignedTask().getTask()), slaveId);
   }
 
-  private IExpectationSetters<?> expectGetNoReservation(String slaveId) {
-    return expect(reservations.get(slaveId)).andReturn(Optional.absent());
+  private IExpectationSetters<?> expectGetReservation(IScheduledTask task, String slaveId) {
+    return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(Optional.of(slaveId));
   }
 
   private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
-    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
-        .andReturn(ImmutableSet.of());
+    return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(Optional.<String>absent());
   }
 }