You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/21 23:46:48 UTC

[1/2] incubator-aurora git commit: Extract a cluster state abstraction from PreemptorImpl.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 91accd62f -> ecc3fbcac


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
new file mode 100644
index 0000000..65581ba
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -0,0 +1,666 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.Constraint;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.OfferQueue;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import static org.apache.mesos.Protos.Offer;
+import static org.apache.mesos.Protos.Resource;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class PreemptorImplTest extends EasyMockTest {
+
+  private static final String USER_A = "user_a";
+  private static final String USER_B = "user_b";
+  private static final String USER_C = "user_c";
+  private static final String JOB_A = "job_a";
+  private static final String JOB_B = "job_b";
+  private static final String JOB_C = "job_c";
+  private static final String TASK_ID_A = "task_a";
+  private static final String TASK_ID_B = "task_b";
+  private static final String TASK_ID_C = "task_c";
+  private static final String TASK_ID_D = "task_d";
+  private static final String HOST_A = "host_a";
+  private static final String RACK_A = "rackA";
+  private static final String RACK_ATTRIBUTE = "rack";
+  private static final String HOST_ATTRIBUTE = "host";
+  private static final String OFFER_A = "offer_a";
+
+  private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
+
+  private StorageTestUtil storageUtil;
+  private StateManager stateManager;
+  private SchedulingFilter schedulingFilter;
+  private FakeClock clock;
+  private StatsProvider statsProvider;
+  private OfferQueue offerQueue;
+  private AttributeAggregate emptyJob;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    stateManager = createMock(StateManager.class);
+    clock = new FakeClock();
+    statsProvider = new FakeStatsProvider();
+    offerQueue = createMock(OfferQueue.class);
+    emptyJob = new AttributeAggregate(
+        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+        createMock(AttributeStore.class));
+  }
+
+  private void runPreemptor(ScheduledTask pendingTask) {
+    PreemptorImpl preemptor = new PreemptorImpl(
+        storageUtil.storage,
+        stateManager,
+        offerQueue,
+        schedulingFilter,
+        PREEMPTION_DELAY,
+        clock,
+        statsProvider,
+        new LiveClusterState(storageUtil.storage));
+
+    preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
+  }
+
+  // TODO(zmanji): Put together a SchedulerPreemptorIntegrationTest as well.
+
+  private void expectGetPendingTasks(ScheduledTask... returnedTasks) {
+    Iterable<String> taskIds = FluentIterable.from(Arrays.asList(returnedTasks))
+        .transform(IScheduledTask.FROM_BUILDER)
+        .transform(Tasks.SCHEDULED_TO_ID);
+    storageUtil.expectTaskFetch(
+        Query.statusScoped(PENDING).byId(taskIds),
+        IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
+  }
+
+  private void expectGetActiveTasks(ScheduledTask... returnedTasks) {
+    storageUtil.expectTaskFetch(
+        LiveClusterState.CANDIDATE_QUERY,
+        IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
+  }
+
+  @Test
+  public void testPreempted() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
+    runOnHost(lowPriority, HOST_A);
+
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(highPriority);
+    expectGetActiveTasks(lowPriority);
+
+    expectFiltering();
+    expectPreempted(lowPriority);
+
+    control.replay();
+    runPreemptor(highPriority);
+  }
+
+  @Test
+  public void testLowestPriorityPreempted() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
+    runOnHost(lowPriority, HOST_A);
+
+    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
+    runOnHost(lowerPriority, HOST_A);
+
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(highPriority);
+    expectGetActiveTasks(lowerPriority, lowerPriority);
+
+    expectFiltering();
+    expectPreempted(lowerPriority);
+
+    control.replay();
+    runPreemptor(highPriority);
+  }
+
+  @Test
+  public void testOnePreemptableTask() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
+    runOnHost(highPriority, HOST_A);
+
+    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
+    runOnHost(lowerPriority, HOST_A);
+
+    ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
+    runOnHost(lowestPriority, HOST_A);
+
+    ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(pendingPriority);
+    expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
+
+    expectFiltering();
+    expectPreempted(lowestPriority);
+
+    control.replay();
+    runPreemptor(pendingPriority);
+  }
+
+  @Test
+  public void testHigherPriorityRunning() throws Exception {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
+    runOnHost(highPriority, HOST_A);
+
+    ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(task);
+    expectGetActiveTasks(highPriority);
+
+    control.replay();
+    runPreemptor(task);
+  }
+
+  @Test
+  public void testProductionPreemptingNonproduction() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    // Use a very low priority for the production task to show that priority is irrelevant.
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
+    runOnHost(a1, HOST_A);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(p1);
+    expectGetActiveTasks(a1);
+
+    expectFiltering();
+    expectPreempted(a1);
+
+    control.replay();
+    runPreemptor(p1);
+  }
+
+  @Test
+  public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    // Use a very low priority for the production task to show that priority is irrelevant.
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
+    ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
+    runOnHost(a1, HOST_A);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(p1);
+    expectGetActiveTasks(a1);
+
+    expectFiltering();
+    expectPreempted(a1);
+
+    control.replay();
+    runPreemptor(p1);
+  }
+
+  @Test
+  public void testProductionUsersDoNotPreemptEachOther() throws Exception {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
+    ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
+    runOnHost(a1, HOST_A);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(p1);
+    expectGetActiveTasks(a1);
+
+    control.replay();
+    runPreemptor(p1);
+  }
+
+  // Ensures a production task can preempt 2 tasks on the same host.
+  @Test
+  public void testProductionPreemptingManyNonProduction() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl();
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
+    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    setUpHost(HOST_A, RACK_A);
+
+    runOnHost(a1, HOST_A);
+    runOnHost(b1, HOST_A);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(p1);
+    expectGetActiveTasks(a1, b1);
+
+    expectPreempted(a1);
+    expectPreempted(b1);
+
+    control.replay();
+    runPreemptor(p1);
+  }
+
+  // Ensures we select the minimal number of tasks to preempt
+  @Test
+  public void testMinimalSetPreempted() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl();
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
+
+    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
+    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
+    b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    setUpHost(HOST_A, RACK_A);
+
+    runOnHost(a1, HOST_A);
+    runOnHost(b1, HOST_A);
+    runOnHost(b2, HOST_A);
+
+    ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetPendingTasks(p1);
+    expectGetActiveTasks(b1, b2, a1);
+
+    expectPreempted(a1);
+
+    control.replay();
+    runPreemptor(p1);
+  }
+
+  // Ensures a production task *never* preempts a production task from another job.
+  @Test
+  public void testProductionJobNeverPreemptsProductionJob() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl();
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    setUpHost(HOST_A, RACK_A);
+
+    runOnHost(p1, HOST_A);
+
+    ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2");
+    p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectNoOffers();
+
+    expectGetActiveTasks(p1);
+    expectGetPendingTasks(p2);
+
+    control.replay();
+    runPreemptor(p2);
+  }
+
+  // Ensures that we can preempt if a task + offer can satisfy a pending task.
+  @Test
+  public void testPreemptWithOfferAndTask() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl();
+
+    setUpHost(HOST_A, RACK_A);
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    runOnHost(a1, HOST_A);
+
+    Offer o1 = makeOffer(OFFER_A, HOST_A, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1);
+    expectOffers(o1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectGetActiveTasks(a1);
+    expectGetPendingTasks(p1);
+
+    expectPreempted(a1);
+
+    control.replay();
+    runPreemptor(p1);
+  }
+
+  // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
+  @Test
+  public void testPreemptWithOfferAndMultipleTasks() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl();
+
+    setUpHost(HOST_A, RACK_A);
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    runOnHost(a1, HOST_A);
+
+    ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
+    a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    runOnHost(a2, HOST_A);
+
+    Offer o1 = makeOffer(OFFER_A, HOST_A, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
+    expectOffers(o1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectGetActiveTasks(a1, a2);
+    expectGetPendingTasks(p1);
+
+    expectPreempted(a1);
+    expectPreempted(a2);
+
+    control.replay();
+    runPreemptor(p1);
+  }
+
+  // Ensures we don't preempt if a host has enough slack to satisfy a pending task.
+  @Test
+  public void testPreemptWithLargeOffer() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl();
+
+    setUpHost(HOST_A, RACK_A);
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    runOnHost(a1, HOST_A);
+
+    Offer o1 = makeOffer(OFFER_A, HOST_A, 2, Amount.of(2048L, Data.MB), Amount.of(1L, Data.MB), 1);
+    expectOffers(o1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
+
+    clock.advance(PREEMPTION_DELAY);
+
+    expectGetActiveTasks(a1);
+    expectGetPendingTasks(p1);
+
+    control.replay();
+    runPreemptor(p1);
+  }
+
+  @Test
+  public void testIgnoresThrottledTasks() throws Exception {
+    // Ensures that the preemptor does not consider a throttled task to be a preemption candidate.
+    schedulingFilter = createMock(SchedulingFilter.class);
+
+    Storage storage = MemStorage.newEmptyStorage();
+
+    final ScheduledTask throttled = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1").setStatus(THROTTLED);
+    throttled.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    final ScheduledTask pending = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    pending.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider store) {
+        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
+            IScheduledTask.build(pending),
+            IScheduledTask.build(throttled)));
+      }
+    });
+
+    clock.advance(PREEMPTION_DELAY);
+
+    control.replay();
+
+    PreemptorImpl preemptor = new PreemptorImpl(
+        storage,
+        stateManager,
+        offerQueue,
+        schedulingFilter,
+        PREEMPTION_DELAY,
+        clock,
+        statsProvider,
+        new LiveClusterState(storage));
+
+    assertEquals(
+        Optional.<String>absent(),
+        preemptor.findPreemptionSlotFor(pending.getAssignedTask().getTaskId(), emptyJob));
+  }
+
+  // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails.
+
+  private Offer makeOffer(String offerId,
+                          String host,
+                          double cpu,
+                          Amount<Long, Data> ram,
+                          Amount<Long, Data> disk,
+                          int numPorts) {
+    List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList();
+    Offer.Builder builder = Offer.newBuilder();
+    builder.getIdBuilder().setValue(offerId);
+    builder.getFrameworkIdBuilder().setValue("framework-id");
+    builder.getSlaveIdBuilder().setValue(hostToId(host));
+    builder.setHostname(host);
+    for (Resource r: resources) {
+      builder.addResources(r);
+    }
+    return builder.build();
+  }
+
+  private void expectOffers(Offer ... offers) {
+    Iterable<HostOffer> hostOffers = FluentIterable.from(Lists.newArrayList(offers))
+        .transform(new Function<Offer, HostOffer>() {
+          @Override
+          public HostOffer apply(Offer offer) {
+            return new HostOffer(
+                offer,
+                IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
+          }
+        });
+    expect(offerQueue.getOffers()).andReturn(hostOffers);
+  }
+
+  private void expectNoOffers() {
+    expect(offerQueue.getOffers()).andReturn(ImmutableList.<HostOffer>of());
+  }
+
+  private IExpectationSetters<Set<Veto>> expectFiltering() {
+    return expect(schedulingFilter.filter(
+        EasyMock.<UnusedResource>anyObject(),
+        EasyMock.<ResourceRequest>anyObject()))
+        .andAnswer(
+            new IAnswer<Set<Veto>>() {
+            @Override
+            public Set<Veto> answer() {
+              return ImmutableSet.of();
+            }
+          });
+  }
+
+  private void expectPreempted(ScheduledTask preempted) throws Exception {
+    expect(stateManager.changeState(
+        eq(storageUtil.mutableStoreProvider),
+        eq(Tasks.id(preempted)),
+        eq(Optional.<ScheduleStatus>absent()),
+        eq(ScheduleStatus.PREEMPTING),
+        EasyMock.<Optional<String>>anyObject()))
+        .andReturn(true);
+  }
+
+  private ScheduledTask makeProductionTask(String role, String job, String taskId) {
+    return makeTask(role, job, taskId, 0, "prod", true);
+  }
+
+  private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) {
+    return makeTask(role, job, taskId, priority, "prod", true);
+  }
+
+  private ScheduledTask makeTask(String role, String job, String taskId, int priority,
+                                 String env, boolean production) {
+    AssignedTask assignedTask = new AssignedTask()
+        .setTaskId(taskId)
+        .setTask(new TaskConfig()
+            .setJob(new JobKey(role, env, job))
+            .setOwner(new Identity(role, role))
+            .setPriority(priority)
+            .setProduction(production)
+            .setJobName(job)
+            .setEnvironment(env)
+            .setConstraints(new HashSet<Constraint>()));
+    ScheduledTask scheduledTask = new ScheduledTask()
+        .setStatus(PENDING)
+        .setAssignedTask(assignedTask);
+    addEvent(scheduledTask, PENDING);
+    return scheduledTask;
+  }
+
+  private ScheduledTask makeTask(String role, String job, String taskId) {
+    return makeTask(role, job, taskId, 0, "dev", false);
+  }
+
+  private ScheduledTask makeTask(String role, String job, String taskId, int priority) {
+    return makeTask(role, job, taskId, priority, "dev", false);
+  }
+
+  private void addEvent(ScheduledTask task, ScheduleStatus status) {
+    task.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
+  }
+
+  // Slave Hostname to a slave id
+  private String hostToId(String hostname) {
+    return hostname + "_id";
+  }
+
+  private void runOnHost(ScheduledTask task, String host) {
+    task.setStatus(RUNNING);
+    addEvent(task, RUNNING);
+    task.getAssignedTask().setSlaveHost(host);
+    task.getAssignedTask().setSlaveId(hostToId(host));
+  }
+
+  private Attribute host(String host) {
+    return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host));
+  }
+
+  private Attribute rack(String rack) {
+    return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack));
+  }
+
+  // Sets up a normal host, no dedicated hosts and no maintenance.
+  private void setUpHost(String host, String rack) {
+    IHostAttributes hostAttrs = IHostAttributes.build(
+        new HostAttributes().setHost(host).setSlaveId(host + "_id")
+            .setMode(NONE).setAttributes(ImmutableSet.of(rack(rack), host(host))));
+
+    expect(this.storageUtil.attributeStore.getHostAttributes(host))
+        .andReturn(Optional.of(hostAttrs)).anyTimes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
new file mode 100644
index 0000000..020b671
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.twitter.common.application.StartupStage;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PreemptorModuleTest extends EasyMockTest {
+
+  private StorageTestUtil storageUtil;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+  }
+
+  private Injector createInjector(Module module) {
+    return Guice.createInjector(
+        module,
+        new LifecycleModule(),
+        new AbstractModule() {
+          private <T> void bindMock(Class<T> clazz) {
+            bind(clazz).toInstance(createMock(clazz));
+          }
+
+          @Override
+          protected void configure() {
+            bindMock(SchedulingFilter.class);
+            bindMock(StateManager.class);
+            bindMock(TaskAssigner.class);
+            bindMock(Thread.UncaughtExceptionHandler.class);
+            bind(Storage.class).toInstance(storageUtil.storage);
+          }
+        });
+  }
+
+  @Test
+  public void testPreemptorDisabled() throws Exception {
+    Injector injector = createInjector(new PreemptorModule(false));
+
+    Supplier<ImmutableSet<IScheduledTask>> taskSupplier =
+        createMock(new EasyMockTest.Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
+    AttributeStore attributeStore = createMock(AttributeStore.class);
+
+    control.replay();
+
+    injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
+
+    injector.getBindings();
+    assertEquals(
+        Optional.<String>absent(),
+        injector.getInstance(Preemptor.class)
+            .findPreemptionSlotFor("a", new AttributeAggregate(taskSupplier, attributeStore)));
+  }
+}


[2/2] incubator-aurora git commit: Extract a cluster state abstraction from PreemptorImpl.

Posted by wf...@apache.org.
Extract a cluster state abstraction from PreemptorImpl.

Bugs closed: AURORA-121

Reviewed at https://reviews.apache.org/r/28310/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ecc3fbca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ecc3fbca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ecc3fbca

Branch: refs/heads/master
Commit: ecc3fbcacd5497c9da3695f1d04677a542958137
Parents: 91accd6
Author: Bill Farner <wf...@apache.org>
Authored: Fri Nov 21 14:42:28 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Nov 21 14:42:28 2014 -0800

----------------------------------------------------------------------
 .../apache/aurora/scheduler/app/AppModule.java  |   2 +
 .../aurora/scheduler/async/AsyncModule.java     |  61 +-
 .../aurora/scheduler/async/Preemptor.java       | 434 ------------
 .../aurora/scheduler/async/TaskScheduler.java   |   1 +
 .../scheduler/async/preemptor/ClusterState.java |  34 +
 .../async/preemptor/LiveClusterState.java       |  68 ++
 .../scheduler/async/preemptor/Preemptor.java    | 396 +++++++++++
 .../async/preemptor/PreemptorModule.java        |  84 +++
 .../aurora/scheduler/async/AsyncModuleTest.java |  31 +-
 .../scheduler/async/PreemptorImplTest.java      | 663 ------------------
 .../scheduler/async/TaskSchedulerImplTest.java  |   5 +-
 .../scheduler/async/TaskSchedulerTest.java      |   1 +
 .../async/preemptor/LiveClusterStateTest.java   |  76 +++
 .../async/preemptor/PreemptorImplTest.java      | 666 +++++++++++++++++++
 .../async/preemptor/PreemptorModuleTest.java    |  90 +++
 15 files changed, 1426 insertions(+), 1186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 6f1cf47..360e161 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -47,6 +47,7 @@ import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.scheduler.SchedulerModule;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.async.AsyncModule;
+import org.apache.aurora.scheduler.async.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.http.JettyServerModule;
@@ -118,6 +119,7 @@ public class AppModule extends AbstractModule {
     install(new MetadataModule());
     install(new QuotaModule());
     install(new JettyServerModule());
+    install(new PreemptorModule());
     install(new SchedulerDriverModule());
     install(new SchedulerServicesModule());
     install(new SchedulerModule());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 03cbe24..7f2c760 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -31,7 +31,6 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
-import com.google.inject.Key;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 import com.twitter.common.args.Arg;
@@ -56,7 +55,6 @@ import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSetting
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -64,8 +62,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
-import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
 import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
 
 /**
@@ -152,15 +148,6 @@ public class AsyncModule extends AbstractModule {
   private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
       Arg.create(Amount.of(30, Time.SECONDS));
 
-  @CmdLine(name = "preemption_delay",
-      help = "Time interval after which a pending task becomes eligible to preempt other tasks")
-  private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
-      Arg.create(Amount.of(10L, Time.MINUTES));
-
-  @CmdLine(name = "enable_preemptor",
-      help = "Enable the preemptor and preemption")
-  private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
-
   @CmdLine(name = "job_update_history_per_job_threshold",
       help = "Maximum number of completed job updates to retain in a job update history.")
   private static final Arg<Integer> JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD = Arg.create(10);
@@ -181,28 +168,11 @@ public class AsyncModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> INITIAL_TASK_KILL_RETRY_INTERVAL =
       Arg.create(Amount.of(5L, Time.SECONDS));
 
-  private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
-    @Override
-    public Optional<String> findPreemptionSlotFor(
-        String taskId,
-        AttributeAggregate attributeAggregate) {
-
-      return Optional.absent();
-    }
-  };
-
   @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while "
       + "trying to satisfy a task preempting another.")
   private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
       Arg.create(Amount.of(3L, Time.MINUTES));
 
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  private @interface PreemptionBinding { }
-
-  @VisibleForTesting
-  static final Key<Preemptor> PREEMPTOR_KEY = Key.get(Preemptor.class, PreemptionBinding.class);
-
   @CmdLine(name = "executor_gc_interval",
       help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
   private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
@@ -215,17 +185,6 @@ public class AsyncModule extends AbstractModule {
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
   private @interface AsyncExecutor { }
 
-  private final boolean enablePreemptor;
-
-  @VisibleForTesting
-  AsyncModule(boolean enablePreemptor) {
-    this.enablePreemptor = enablePreemptor;
-  }
-
-  public AsyncModule() {
-    this(ENABLE_PREEMPTOR.get());
-  }
-
   @VisibleForTesting
   static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size";
 
@@ -274,22 +233,11 @@ public class AsyncModule extends AbstractModule {
 
         bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
         expose(RescheduleCalculator.class);
-        if (enablePreemptor) {
-          bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
-          bind(PreemptorImpl.class).in(Singleton.class);
-          LOG.info("Preemptor Enabled.");
-        } else {
-          bind(PREEMPTOR_KEY).toInstance(NULL_PREEMPTOR);
-          LOG.warning("Preemptor Disabled.");
-        }
-        expose(PREEMPTOR_KEY);
-        bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
-            .toInstance(PREEMPTION_DELAY.get());
         bind(TaskGroups.class).in(Singleton.class);
         expose(TaskGroups.class);
       }
     });
-    bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
+    bindTaskScheduler(binder(), RESERVATION_DURATION.get());
     PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
 
     install(new PrivateModule() {
@@ -390,15 +338,10 @@ public class AsyncModule extends AbstractModule {
    * well with the MultiBinder that backs the PubSub system.
    */
   @VisibleForTesting
-  static void bindTaskScheduler(
-      Binder binder,
-      final Key<Preemptor> preemptorKey,
-      final Amount<Long, Time> reservationDuration) {
-
+  static void bindTaskScheduler(Binder binder, final Amount<Long, Time> reservationDuration) {
     binder.install(new PrivateModule() {
       @Override
       protected void configure() {
-        bind(Preemptor.class).to(preemptorKey);
         bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(ReservationDuration.class)
             .toInstance(reservationDuration);
         bind(TaskScheduler.class).to(TaskSchedulerImpl.class);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
deleted file mode 100644
index 97d5d13..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import static org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import static org.apache.aurora.scheduler.storage.Storage.Work;
-
-/**
- * Preempts active tasks in favor of higher priority tasks.
- */
-public interface Preemptor {
-
-  /**
-   * Preempts active tasks in favor of the input task.
-   *
-   * @param taskId ID of the preempting task.
-   * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
-   * @return ID of the slave where preemption occured.
-   */
-  Optional<String> findPreemptionSlotFor(String taskId, AttributeAggregate attributeAggregate);
-
-  /**
-   * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
-   * priority than tasks that are currently running.
-   *
-   * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
-   * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt other
-   * tasks.
-   */
-  class PreemptorImpl implements Preemptor {
-
-    /**
-     * Binding annotation for the time interval after which a pending task becomes eligible to
-     * preempt other tasks.
-     */
-    @Qualifier
-    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-    @interface PreemptionDelay { }
-
-    @VisibleForTesting
-    static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
-        EnumSet.copyOf(Sets.difference(Tasks.SLAVE_ASSIGNED_STATES, EnumSet.of(PREEMPTING))));
-
-    private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
-    // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
-    private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
-    // Incremented every time we fail to find tasks to preempt for a pending task.
-    private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
-
-    private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
-      @Override
-      public boolean apply(IScheduledTask task) {
-        return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
-            >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
-      }
-    };
-
-    private final Storage storage;
-    private final StateManager stateManager;
-    private final OfferQueue offerQueue;
-    private final SchedulingFilter schedulingFilter;
-    private final Amount<Long, Time> preemptionCandidacyDelay;
-    private final Clock clock;
-    private final AtomicLong missingAttributes;
-
-    /**
-     * Creates a new preemptor.
-     *
-     * @param storage Backing store for tasks.
-     * @param stateManager Scheduler state controller to instruct when preempting tasks.
-     * @param offerQueue Queue that contains available Mesos resource offers.
-     * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
-     * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
-     *                                 tasks.
-     * @param clock Clock to check current time.
-     */
-    @Inject
-    PreemptorImpl(
-        Storage storage,
-        StateManager stateManager,
-        OfferQueue offerQueue,
-        SchedulingFilter schedulingFilter,
-        @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
-        Clock clock,
-        StatsProvider statsProvider) {
-
-      this.storage = requireNonNull(storage);
-      this.stateManager = requireNonNull(stateManager);
-      this.offerQueue = requireNonNull(offerQueue);
-      this.schedulingFilter = requireNonNull(schedulingFilter);
-      this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
-      this.clock = requireNonNull(clock);
-      missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes");
-    }
-
-    private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
-      return Lists.newArrayList(Iterables.transform(Iterables.filter(
-          Storage.Util.consistentFetchTasks(storage, query), filter),
-          SCHEDULED_TO_ASSIGNED));
-    }
-
-    private List<IAssignedTask> fetch(Query.Builder query) {
-      return fetch(query, Predicates.<IScheduledTask>alwaysTrue());
-    }
-
-    private static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
-        new Function<IAssignedTask, String>() {
-          @Override
-          public String apply(IAssignedTask input) {
-            return input.getSlaveId();
-          }
-        };
-
-    private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
-        new Function<IAssignedTask, ResourceSlot>() {
-          @Override
-          public ResourceSlot apply(IAssignedTask task) {
-            return ResourceSlot.from(task.getTask());
-          }
-        };
-
-    private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
-        new Function<HostOffer, ResourceSlot>() {
-          @Override
-          public ResourceSlot apply(HostOffer offer) {
-            return ResourceSlot.from(offer.getOffer());
-          }
-        };
-
-    private static final Function<HostOffer, String> OFFER_TO_HOST =
-        new Function<HostOffer, String>() {
-          @Override
-          public String apply(HostOffer offer) {
-            return offer.getOffer().getHostname();
-          }
-        };
-
-    private static final Function<HostOffer, IHostAttributes> OFFER_TO_ATTRIBUTES =
-        new Function<HostOffer, IHostAttributes>() {
-          @Override
-          public IHostAttributes apply(HostOffer offer) {
-            return offer.getAttributes();
-          }
-        };
-
-    // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
-    // ordering
-    private static final Ordering<IAssignedTask> RESOURCE_ORDER =
-        ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
-
-    /**
-     * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
-     * The empty set indicates the offers (slack) are enough.
-     * A set with elements indicates those tasks and the offers are enough.
-     */
-    private Optional<Set<IAssignedTask>> getTasksToPreempt(
-        Iterable<IAssignedTask> possibleVictims,
-        Iterable<HostOffer> offers,
-        IAssignedTask pendingTask,
-        AttributeAggregate jobState) {
-
-      // This enforces the precondition that all of the resources are from the same host. We need to
-      // get the host for the schedulingFilter.
-      Set<String> hosts = ImmutableSet.<String>builder()
-          .addAll(Iterables.transform(possibleVictims, Tasks.ASSIGNED_TO_SLAVE_HOST))
-          .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
-
-      String host = Iterables.getOnlyElement(hosts);
-
-      ResourceSlot slackResources =
-          ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
-
-      if (!Iterables.isEmpty(offers)) {
-        if (Iterables.size(offers) > 1) {
-          // There are multiple offers for the same host. Since both have maintenance information
-          // we don't preempt with this information and wait for mesos to merge the two offers for
-          // us.
-          return Optional.absent();
-        }
-        IHostAttributes attributes = Iterables.getOnlyElement(
-            FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
-
-        Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
-            new UnusedResource(slackResources, attributes),
-            new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
-
-        if (vetoes.isEmpty()) {
-          return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
-        }
-      }
-
-      FluentIterable<IAssignedTask> preemptableTasks = FluentIterable.from(possibleVictims)
-          .filter(Predicates.compose(
-              preemptionFilter(pendingTask.getTask()),
-              Tasks.ASSIGNED_TO_INFO));
-
-      if (preemptableTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
-
-      Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
-
-      for (IAssignedTask victim : sortedVictims) {
-        toPreemptTasks.add(victim);
-
-        ResourceSlot totalResource = ResourceSlot.sum(
-            ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
-            slackResources);
-
-        Optional<IHostAttributes> attributes = getHostAttributes(host);
-        if (!attributes.isPresent()) {
-          missingAttributes.incrementAndGet();
-          continue;
-        }
-
-        Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
-            new UnusedResource(totalResource, attributes.get()),
-            new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
-
-        if (vetoes.isEmpty()) {
-          return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
-        }
-      }
-      return Optional.absent();
-    }
-
-    private Optional<IHostAttributes> getHostAttributes(final String host) {
-      return storage.weaklyConsistentRead(new Work.Quiet<Optional<IHostAttributes>>() {
-        @Override
-        public Optional<IHostAttributes> apply(StoreProvider storeProvider) {
-          return storeProvider.getAttributeStore().getHostAttributes(host);
-        }
-      });
-    }
-
-    private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
-        new Function<HostOffer, String>() {
-          @Override
-          public String apply(HostOffer offer) {
-            return offer.getOffer().getSlaveId().getValue();
-          }
-        };
-
-    /**
-     * Order by production flag (true, then false), subsorting by task ID.
-     * TODO(wfarner): Re-evaluate - what do we gain from sorting by task ID?
-     */
-    private static final Ordering<IAssignedTask> SCHEDULING_ORDER =
-        Ordering.explicit(true, false)
-            .onResultOf(Functions.compose(
-                Functions.forPredicate(Tasks.IS_PRODUCTION),
-                Tasks.ASSIGNED_TO_INFO))
-            .compound(Ordering.natural().onResultOf(Tasks.ASSIGNED_TO_ID));
-
-    private Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
-      // Only non-pending active tasks may be preempted.
-      List<IAssignedTask> activeTasks = fetch(CANDIDATE_QUERY);
-
-      // Walk through the preemption candidates in reverse scheduling order.
-      Collections.sort(activeTasks, SCHEDULING_ORDER.reverse());
-
-      // Group the tasks by slave id so they can be paired with offers from the same slave.
-      return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
-    }
-
-    @Override
-    public synchronized Optional<String> findPreemptionSlotFor(
-        String taskId,
-        AttributeAggregate attributeAggregate) {
-
-      List<IAssignedTask> pendingTasks =
-          fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
-
-      // Task is no longer PENDING no need to preempt
-      if (pendingTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      final IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
-
-      Multimap<String, IAssignedTask> slavesToActiveTasks = getSlavesToActiveTasks();
-
-      if (slavesToActiveTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      attemptedPreemptions.incrementAndGet();
-
-      // Group the offers by slave id so they can be paired with active tasks from the same slave.
-      Multimap<String, HostOffer> slavesToOffers =
-          Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
-
-      Set<String> allSlaves = ImmutableSet.<String>builder()
-          .addAll(slavesToOffers.keySet())
-          .addAll(slavesToActiveTasks.keySet())
-          .build();
-
-      for (String slaveID : allSlaves) {
-        final Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
-            slavesToActiveTasks.get(slaveID),
-            slavesToOffers.get(slaveID),
-            pendingTask,
-            attributeAggregate);
-
-        if (toPreemptTasks.isPresent()) {
-          storage.write(new MutateWork.NoResult.Quiet() {
-            @Override
-            protected void execute(MutableStoreProvider storeProvider) {
-              for (IAssignedTask toPreempt : toPreemptTasks.get()) {
-                stateManager.changeState(
-                    storeProvider,
-                    toPreempt.getTaskId(),
-                    Optional.<ScheduleStatus>absent(),
-                    PREEMPTING,
-                    Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
-                tasksPreempted.incrementAndGet();
-              }
-            }
-          });
-          return Optional.of(slaveID);
-        }
-      }
-
-      noSlotsFound.incrementAndGet();
-      return Optional.absent();
-    }
-
-    /**
-     * Creates a static filter that will identify tasks that may preempt the provided task.
-     * A task may preempt another task if the following conditions hold true:
-     * - The resources reserved for {@code preemptableTask} are sufficient to satisfy the task.
-     * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
-     *     OR {@code preemptableTask} is non-production and the compared task is production.
-     *
-     * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
-     * @return A filter that will compare the priorities and resources required by other tasks
-     *     with {@code preemptableTask}.
-     */
-    private static Predicate<ITaskConfig> preemptionFilter(final ITaskConfig pendingTask) {
-      return new Predicate<ITaskConfig>() {
-        @Override
-        public boolean apply(ITaskConfig possibleVictim) {
-          boolean pendingIsProduction = pendingTask.isProduction();
-          boolean victimIsProduction = possibleVictim.isProduction();
-
-          if (pendingIsProduction && !victimIsProduction) {
-            return true;
-          } else if (pendingIsProduction == victimIsProduction) {
-            // If production flags are equal, preemption is based on priority within the same role.
-            if (pendingTask.getJob().getRole().equals(possibleVictim.getJob().getRole())) {
-              return pendingTask.getPriority() > possibleVictim.getPriority();
-            } else {
-              return false;
-            }
-          } else {
-            return false;
-          }
-        }
-      };
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 626545a..ead9d28 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -42,6 +42,7 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
new file mode 100644
index 0000000..4f0019a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.Multimap;
+
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+/**
+ * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
+ */
+public interface ClusterState {
+
+  /**
+   * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are
+   * assigned to.
+   * <p>
+   * TODO(wfarner): Return a more minimal type than IAssignedTask here.
+   *
+   * @return Active tasks and their associated slave IDs.
+   */
+  Multimap<String, IAssignedTask> getSlavesToActiveTasks();
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
new file mode 100644
index 0000000..0da4d2a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.EnumSet;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+
+class LiveClusterState implements ClusterState {
+  @VisibleForTesting
+  static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
+      new Function<IAssignedTask, String>() {
+        @Override
+        public String apply(IAssignedTask input) {
+          return input.getSlaveId();
+        }
+      };
+
+  @VisibleForTesting
+  static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
+      EnumSet.copyOf(Sets.difference(Tasks.SLAVE_ASSIGNED_STATES, EnumSet.of(PREEMPTING))));
+
+  private final Storage storage;
+
+  @Inject
+  LiveClusterState(Storage storage) {
+    this.storage = requireNonNull(storage);
+  }
+
+  @Override
+  public Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
+    // Only non-pending active tasks may be preempted.
+    Iterable<IAssignedTask> activeTasks = Iterables.transform(
+        Storage.Util.consistentFetchTasks(storage, CANDIDATE_QUERY),
+        SCHEDULED_TO_ASSIGNED);
+
+    // Group the tasks by slave id so they can be paired with offers from the same slave.
+    return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
new file mode 100644
index 0000000..afbd645
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.async.OfferQueue;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.Util;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+
+/**
+ * Preempts active tasks in favor of higher priority tasks.
+ */
+public interface Preemptor {
+
+  /**
+   * Preempts active tasks in favor of the input task.
+   *
+   * @param taskId ID of the preempting task.
+   * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
+   * @return ID of the slave where preemption occured.
+   */
+  Optional<String> findPreemptionSlotFor(String taskId, AttributeAggregate attributeAggregate);
+
+  /**
+   * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
+   * priority than tasks that are currently running.
+   *
+   * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
+   * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt other
+   * tasks.
+   * <p>
+   * TODO(wfarner): Move this class out of the interface to make it package private.
+   */
+  class PreemptorImpl implements Preemptor {
+
+    /**
+     * Binding annotation for the time interval after which a pending task becomes eligible to
+     * preempt other tasks.
+     */
+    @Qualifier
+    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+    @interface PreemptionDelay { }
+
+    private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
+    // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
+    private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
+    // Incremented every time we fail to find tasks to preempt for a pending task.
+    private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
+
+    private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
+      @Override
+      public boolean apply(IScheduledTask task) {
+        return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
+            >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
+      }
+    };
+
+    private final Storage storage;
+    private final StateManager stateManager;
+    private final OfferQueue offerQueue;
+    private final SchedulingFilter schedulingFilter;
+    private final Amount<Long, Time> preemptionCandidacyDelay;
+    private final Clock clock;
+    private final AtomicLong missingAttributes;
+    private final ClusterState clusterState;
+
+    /**
+     * Creates a new preemptor.
+     *
+     * @param storage Backing store for tasks.
+     * @param stateManager Scheduler state controller to instruct when preempting tasks.
+     * @param offerQueue Queue that contains available Mesos resource offers.
+     * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
+     * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
+     *                                 tasks.
+     * @param clock Clock to check current time.
+     */
+    @Inject
+    public PreemptorImpl(
+        Storage storage,
+        StateManager stateManager,
+        OfferQueue offerQueue,
+        SchedulingFilter schedulingFilter,
+        @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
+        Clock clock,
+        StatsProvider statsProvider,
+        ClusterState clusterState) {
+
+      this.storage = requireNonNull(storage);
+      this.stateManager = requireNonNull(stateManager);
+      this.offerQueue = requireNonNull(offerQueue);
+      this.schedulingFilter = requireNonNull(schedulingFilter);
+      this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
+      this.clock = requireNonNull(clock);
+      missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes");
+      this.clusterState = requireNonNull(clusterState);
+    }
+
+    private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
+        new Function<IAssignedTask, ResourceSlot>() {
+          @Override
+          public ResourceSlot apply(IAssignedTask task) {
+            return ResourceSlot.from(task.getTask());
+          }
+        };
+
+    private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
+        new Function<HostOffer, ResourceSlot>() {
+          @Override
+          public ResourceSlot apply(HostOffer offer) {
+            return ResourceSlot.from(offer.getOffer());
+          }
+        };
+
+    private static final Function<HostOffer, String> OFFER_TO_HOST =
+        new Function<HostOffer, String>() {
+          @Override
+          public String apply(HostOffer offer) {
+            return offer.getOffer().getHostname();
+          }
+        };
+
+    private static final Function<HostOffer, IHostAttributes> OFFER_TO_ATTRIBUTES =
+        new Function<HostOffer, IHostAttributes>() {
+          @Override
+          public IHostAttributes apply(HostOffer offer) {
+            return offer.getAttributes();
+          }
+        };
+
+    // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
+    // ordering
+    private static final Ordering<IAssignedTask> RESOURCE_ORDER =
+        ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
+
+    /**
+     * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
+     * The empty set indicates the offers (slack) are enough.
+     * A set with elements indicates those tasks and the offers are enough.
+     */
+    private Optional<Set<String>> getTasksToPreempt(
+        Iterable<IAssignedTask> possibleVictims,
+        Iterable<HostOffer> offers,
+        IAssignedTask pendingTask,
+        AttributeAggregate jobState) {
+
+      // This enforces the precondition that all of the resources are from the same host. We need to
+      // get the host for the schedulingFilter.
+      Set<String> hosts = ImmutableSet.<String>builder()
+          .addAll(Iterables.transform(possibleVictims, Tasks.ASSIGNED_TO_SLAVE_HOST))
+          .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
+
+      String host = Iterables.getOnlyElement(hosts);
+
+      ResourceSlot slackResources =
+          ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
+
+      if (!Iterables.isEmpty(offers)) {
+        if (Iterables.size(offers) > 1) {
+          // There are multiple offers for the same host. Since both have maintenance information
+          // we don't preempt with this information and wait for mesos to merge the two offers for
+          // us.
+          return Optional.absent();
+        }
+        IHostAttributes attributes = Iterables.getOnlyElement(
+            FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
+
+        Set<Veto> vetoes = schedulingFilter.filter(
+            new UnusedResource(slackResources, attributes),
+            new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
+
+        if (vetoes.isEmpty()) {
+          return Optional.<Set<String>>of(ImmutableSet.<String>of());
+        }
+      }
+
+      FluentIterable<IAssignedTask> preemptableTasks = FluentIterable.from(possibleVictims)
+          .filter(Predicates.compose(
+              preemptionFilter(pendingTask.getTask()),
+              Tasks.ASSIGNED_TO_INFO));
+
+      if (preemptableTasks.isEmpty()) {
+        return Optional.absent();
+      }
+
+      List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
+
+      Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
+
+      for (IAssignedTask victim : sortedVictims) {
+        toPreemptTasks.add(victim);
+
+        ResourceSlot totalResource = ResourceSlot.sum(
+            ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
+            slackResources);
+
+        Optional<IHostAttributes> attributes = getHostAttributes(host);
+        if (!attributes.isPresent()) {
+          missingAttributes.incrementAndGet();
+          continue;
+        }
+
+        Set<Veto> vetoes = schedulingFilter.filter(
+            new UnusedResource(totalResource, attributes.get()),
+            new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
+
+        if (vetoes.isEmpty()) {
+          Set<String> taskIds =
+              FluentIterable.from(toPreemptTasks).transform(Tasks.ASSIGNED_TO_ID).toSet();
+          return Optional.of(taskIds);
+        }
+      }
+      return Optional.absent();
+    }
+
+    private Optional<IHostAttributes> getHostAttributes(final String host) {
+      return storage.weaklyConsistentRead(new Storage.Work.Quiet<Optional<IHostAttributes>>() {
+        @Override
+        public Optional<IHostAttributes> apply(Storage.StoreProvider storeProvider) {
+          return storeProvider.getAttributeStore().getHostAttributes(host);
+        }
+      });
+    }
+
+    private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
+        new Function<HostOffer, String>() {
+          @Override
+          public String apply(HostOffer offer) {
+            return offer.getOffer().getSlaveId().getValue();
+          }
+        };
+
+    private Optional<IAssignedTask> fetchIdlePendingTask(String taskId) {
+      Query.Builder query = Query.taskScoped(taskId).byStatus(PENDING);
+      Iterable<IAssignedTask> result = FluentIterable
+          .from(Util.consistentFetchTasks(storage, query))
+          .filter(isIdleTask)
+          .transform(SCHEDULED_TO_ASSIGNED);
+      return Optional.fromNullable(Iterables.getOnlyElement(result, null));
+    }
+
+    @Override
+    public synchronized Optional<String> findPreemptionSlotFor(
+        final String taskId,
+        AttributeAggregate attributeAggregate) {
+
+      final Optional<IAssignedTask> pendingTask = fetchIdlePendingTask(taskId);
+
+      // Task is no longer PENDING no need to preempt.
+      if (!pendingTask.isPresent()) {
+        return Optional.absent();
+      }
+
+      Multimap<String, IAssignedTask> slavesToActiveTasks = clusterState.getSlavesToActiveTasks();
+
+      if (slavesToActiveTasks.isEmpty()) {
+        return Optional.absent();
+      }
+
+      attemptedPreemptions.incrementAndGet();
+
+      // Group the offers by slave id so they can be paired with active tasks from the same slave.
+      Multimap<String, HostOffer> slavesToOffers =
+          Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
+
+      Set<String> allSlaves = ImmutableSet.<String>builder()
+          .addAll(slavesToOffers.keySet())
+          .addAll(slavesToActiveTasks.keySet())
+          .build();
+
+      for (String slaveID : allSlaves) {
+        final Optional<Set<String>> toPreemptTasks = getTasksToPreempt(
+            slavesToActiveTasks.get(slaveID),
+            slavesToOffers.get(slaveID),
+            pendingTask.get(),
+            attributeAggregate);
+
+        if (toPreemptTasks.isPresent()) {
+          storage.write(new Storage.MutateWork.NoResult.Quiet() {
+            @Override
+            protected void execute(Storage.MutableStoreProvider storeProvider) {
+              for (String toPreempt : toPreemptTasks.get()) {
+                stateManager.changeState(
+                    storeProvider,
+                    toPreempt,
+                    Optional.<ScheduleStatus>absent(),
+                    PREEMPTING,
+                    Optional.of("Preempting in favor of " + taskId));
+                tasksPreempted.incrementAndGet();
+              }
+            }
+          });
+          return Optional.of(slaveID);
+        }
+      }
+
+      noSlotsFound.incrementAndGet();
+      return Optional.absent();
+    }
+
+    /**
+     * Creates a static filter that will identify tasks that may preempt the provided task.
+     * A task may preempt another task if the following conditions hold true:
+     * - The resources reserved for {@code preemptableTask} are sufficient to satisfy the task.
+     * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
+     *     OR {@code preemptableTask} is non-production and the compared task is production.
+     *
+     * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
+     * @return A filter that will compare the priorities and resources required by other tasks
+     *     with {@code preemptableTask}.
+     */
+    private static Predicate<ITaskConfig> preemptionFilter(final ITaskConfig pendingTask) {
+      return new Predicate<ITaskConfig>() {
+        @Override
+        public boolean apply(ITaskConfig possibleVictim) {
+          boolean pendingIsProduction = pendingTask.isProduction();
+          boolean victimIsProduction = possibleVictim.isProduction();
+
+          if (pendingIsProduction && !victimIsProduction) {
+            return true;
+          } else if (pendingIsProduction == victimIsProduction) {
+            // If production flags are equal, preemption is based on priority within the same role.
+            if (pendingTask.getJob().getRole().equals(possibleVictim.getJob().getRole())) {
+              return pendingTask.getPriority() > possibleVictim.getPriority();
+            } else {
+              return false;
+            }
+          } else {
+            return false;
+          }
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
new file mode 100644
index 0000000..bc96b67
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl.PreemptionDelay;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+
+public class PreemptorModule extends PrivateModule {
+
+  private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName());
+
+  @CmdLine(name = "enable_preemptor",
+      help = "Enable the preemptor and preemption")
+  private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
+
+  @CmdLine(name = "preemption_delay",
+      help = "Time interval after which a pending task becomes eligible to preempt other tasks")
+  private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
+      Arg.create(Amount.of(10L, Time.MINUTES));
+
+  private final boolean enablePreemptor;
+
+  @VisibleForTesting
+  PreemptorModule(boolean enablePreemptor) {
+    this.enablePreemptor = enablePreemptor;
+  }
+
+  public PreemptorModule() {
+    this(ENABLE_PREEMPTOR.get());
+  }
+
+  @Override
+  protected void configure() {
+    if (enablePreemptor) {
+      bind(Preemptor.class).to(PreemptorImpl.class);
+      bind(PreemptorImpl.class).in(Singleton.class);
+      bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
+          .toInstance(PREEMPTION_DELAY.get());
+      bind(ClusterState.class).to(LiveClusterState.class);
+      bind(LiveClusterState.class).in(Singleton.class);
+      LOG.info("Preemptor Enabled.");
+    } else {
+      bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
+      LOG.warning("Preemptor Disabled.");
+    }
+
+    expose(Preemptor.class);
+  }
+
+  private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
+    @Override
+    public Optional<String> findPreemptionSlotFor(
+        String taskId,
+        AttributeAggregate attributeAggregate) {
+
+      return Optional.absent();
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
index e007c30..4ed6b15 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
@@ -15,10 +15,7 @@ package org.apache.aurora.scheduler.async;
 
 import java.util.Set;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Service;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -26,23 +23,19 @@ import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.TypeLiteral;
-import com.twitter.common.application.StartupStage;
 import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.base.ExceptionalCommand;
 import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.scheduler.AppStartup;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
@@ -83,6 +76,7 @@ public class AsyncModuleTest extends EasyMockTest {
             bindMock(Driver.class);
             bindMock(SchedulingFilter.class);
             bindMock(MaintenanceController.class);
+            bindMock(Preemptor.class);
             bindMock(StateManager.class);
             bindMock(TaskAssigner.class);
             bindMock(Thread.UncaughtExceptionHandler.class);
@@ -93,7 +87,7 @@ public class AsyncModuleTest extends EasyMockTest {
 
   @Test
   public void testBindings() throws Exception {
-    Injector injector = createInjector(new AsyncModule(true));
+    Injector injector = createInjector(new AsyncModule());
 
     control.replay();
 
@@ -110,23 +104,4 @@ public class AsyncModuleTest extends EasyMockTest {
         statsProvider.getAllValues()
     );
   }
-
-  @Test
-  public void testPreemptorDisabled() throws Exception {
-    Injector injector = createInjector(new AsyncModule(false));
-
-    Supplier<ImmutableSet<IScheduledTask>> taskSupplier =
-        createMock(new Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
-    AttributeStore attributeStore = createMock(AttributeStore.class);
-
-    control.replay();
-
-    injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
-
-    injector.getBindings();
-    assertEquals(
-        Optional.<String>absent(),
-        injector.getInstance(AsyncModule.PREEMPTOR_KEY)
-            .findPreemptionSlotFor("a", new AttributeAggregate(taskSupplier, attributeStore)));
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
deleted file mode 100644
index 69108cf..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.Constraint;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
-import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import static org.apache.mesos.Protos.Offer;
-import static org.apache.mesos.Protos.Resource;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class PreemptorImplTest extends EasyMockTest {
-
-  private static final String USER_A = "user_a";
-  private static final String USER_B = "user_b";
-  private static final String USER_C = "user_c";
-  private static final String JOB_A = "job_a";
-  private static final String JOB_B = "job_b";
-  private static final String JOB_C = "job_c";
-  private static final String TASK_ID_A = "task_a";
-  private static final String TASK_ID_B = "task_b";
-  private static final String TASK_ID_C = "task_c";
-  private static final String TASK_ID_D = "task_d";
-  private static final String HOST_A = "host_a";
-  private static final String RACK_A = "rackA";
-  private static final String RACK_ATTRIBUTE = "rack";
-  private static final String HOST_ATTRIBUTE = "host";
-  private static final String OFFER_A = "offer_a";
-
-  private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
-
-  private StorageTestUtil storageUtil;
-  private StateManager stateManager;
-  private SchedulingFilter schedulingFilter;
-  private FakeClock clock;
-  private StatsProvider statsProvider;
-  private OfferQueue offerQueue;
-  private AttributeAggregate emptyJob;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    stateManager = createMock(StateManager.class);
-    clock = new FakeClock();
-    statsProvider = new FakeStatsProvider();
-    offerQueue = createMock(OfferQueue.class);
-    emptyJob = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        createMock(AttributeStore.class));
-  }
-
-  private void runPreemptor(ScheduledTask pendingTask) {
-    PreemptorImpl preemptor = new PreemptorImpl(
-        storageUtil.storage,
-        stateManager,
-        offerQueue,
-        schedulingFilter,
-        PREEMPTION_DELAY,
-        clock,
-        statsProvider);
-
-    preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
-  }
-
-  // TODO(zmanji): Put together a SchedulerPreemptorIntegrationTest as well.
-
-  private void expectGetPendingTasks(ScheduledTask... returnedTasks) {
-    Iterable<String> taskIds = FluentIterable.from(Arrays.asList(returnedTasks))
-        .transform(IScheduledTask.FROM_BUILDER)
-        .transform(Tasks.SCHEDULED_TO_ID);
-    storageUtil.expectTaskFetch(
-        Query.statusScoped(PENDING).byId(taskIds),
-        IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
-  }
-
-  private void expectGetActiveTasks(ScheduledTask... returnedTasks) {
-    storageUtil.expectTaskFetch(
-        PreemptorImpl.CANDIDATE_QUERY,
-        IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
-  }
-
-  @Test
-  public void testPreempted() throws Exception {
-    setUpHost(HOST_A, RACK_A);
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
-    runOnHost(lowPriority, HOST_A);
-
-    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(highPriority);
-    expectGetActiveTasks(lowPriority);
-
-    expectFiltering();
-    expectPreempted(lowPriority);
-
-    control.replay();
-    runPreemptor(highPriority);
-  }
-
-  @Test
-  public void testLowestPriorityPreempted() throws Exception {
-    setUpHost(HOST_A, RACK_A);
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
-    runOnHost(lowPriority, HOST_A);
-
-    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
-    runOnHost(lowerPriority, HOST_A);
-
-    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(highPriority);
-    expectGetActiveTasks(lowerPriority, lowerPriority);
-
-    expectFiltering();
-    expectPreempted(lowerPriority);
-
-    control.replay();
-    runPreemptor(highPriority);
-  }
-
-  @Test
-  public void testOnePreemptableTask() throws Exception {
-    setUpHost(HOST_A, RACK_A);
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
-    runOnHost(highPriority, HOST_A);
-
-    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
-    runOnHost(lowerPriority, HOST_A);
-
-    ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
-    runOnHost(lowestPriority, HOST_A);
-
-    ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(pendingPriority);
-    expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
-
-    expectFiltering();
-    expectPreempted(lowestPriority);
-
-    control.replay();
-    runPreemptor(pendingPriority);
-  }
-
-  @Test
-  public void testHigherPriorityRunning() throws Exception {
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
-    runOnHost(highPriority, HOST_A);
-
-    ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(task);
-    expectGetActiveTasks(highPriority);
-
-    control.replay();
-    runPreemptor(task);
-  }
-
-  @Test
-  public void testProductionPreemptingNonproduction() throws Exception {
-    setUpHost(HOST_A, RACK_A);
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    // Use a very low priority for the production task to show that priority is irrelevant.
-    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
-    runOnHost(a1, HOST_A);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(p1);
-    expectGetActiveTasks(a1);
-
-    expectFiltering();
-    expectPreempted(a1);
-
-    control.replay();
-    runPreemptor(p1);
-  }
-
-  @Test
-  public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
-    setUpHost(HOST_A, RACK_A);
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    // Use a very low priority for the production task to show that priority is irrelevant.
-    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
-    ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
-    runOnHost(a1, HOST_A);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(p1);
-    expectGetActiveTasks(a1);
-
-    expectFiltering();
-    expectPreempted(a1);
-
-    control.replay();
-    runPreemptor(p1);
-  }
-
-  @Test
-  public void testProductionUsersDoNotPreemptEachOther() throws Exception {
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
-    ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
-    runOnHost(a1, HOST_A);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(p1);
-    expectGetActiveTasks(a1);
-
-    control.replay();
-    runPreemptor(p1);
-  }
-
-  // Ensures a production task can preempt 2 tasks on the same host.
-  @Test
-  public void testProductionPreemptingManyNonProduction() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
-    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    setUpHost(HOST_A, RACK_A);
-
-    runOnHost(a1, HOST_A);
-    runOnHost(b1, HOST_A);
-
-    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(p1);
-    expectGetActiveTasks(a1, b1);
-
-    expectPreempted(a1);
-    expectPreempted(b1);
-
-    control.replay();
-    runPreemptor(p1);
-  }
-
-  // Ensures we select the minimal number of tasks to preempt
-  @Test
-  public void testMinimalSetPreempted() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
-
-    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
-    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
-    b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    setUpHost(HOST_A, RACK_A);
-
-    runOnHost(a1, HOST_A);
-    runOnHost(b1, HOST_A);
-    runOnHost(b2, HOST_A);
-
-    ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetPendingTasks(p1);
-    expectGetActiveTasks(b1, b2, a1);
-
-    expectPreempted(a1);
-
-    control.replay();
-    runPreemptor(p1);
-  }
-
-  // Ensures a production task *never* preempts a production task from another job.
-  @Test
-  public void testProductionJobNeverPreemptsProductionJob() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
-    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
-    setUpHost(HOST_A, RACK_A);
-
-    runOnHost(p1, HOST_A);
-
-    ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2");
-    p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectNoOffers();
-
-    expectGetActiveTasks(p1);
-    expectGetPendingTasks(p2);
-
-    control.replay();
-    runPreemptor(p2);
-  }
-
-  // Ensures that we can preempt if a task + offer can satisfy a pending task.
-  @Test
-  public void testPreemptWithOfferAndTask() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
-
-    setUpHost(HOST_A, RACK_A);
-
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    runOnHost(a1, HOST_A);
-
-    Offer o1 = makeOffer(OFFER_A, HOST_A, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1);
-    expectOffers(o1);
-
-    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectGetActiveTasks(a1);
-    expectGetPendingTasks(p1);
-
-    expectPreempted(a1);
-
-    control.replay();
-    runPreemptor(p1);
-  }
-
-  // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
-  @Test
-  public void testPreemptWithOfferAndMultipleTasks() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
-
-    setUpHost(HOST_A, RACK_A);
-
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    runOnHost(a1, HOST_A);
-
-    ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
-    a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    runOnHost(a2, HOST_A);
-
-    Offer o1 = makeOffer(OFFER_A, HOST_A, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
-    expectOffers(o1);
-
-    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectGetActiveTasks(a1, a2);
-    expectGetPendingTasks(p1);
-
-    expectPreempted(a1);
-    expectPreempted(a2);
-
-    control.replay();
-    runPreemptor(p1);
-  }
-
-  // Ensures we don't preempt if a host has enough slack to satisfy a pending task.
-  @Test
-  public void testPreemptWithLargeOffer() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl();
-
-    setUpHost(HOST_A, RACK_A);
-
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    runOnHost(a1, HOST_A);
-
-    Offer o1 = makeOffer(OFFER_A, HOST_A, 2, Amount.of(2048L, Data.MB), Amount.of(1L, Data.MB), 1);
-    expectOffers(o1);
-
-    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
-
-    clock.advance(PREEMPTION_DELAY);
-
-    expectGetActiveTasks(a1);
-    expectGetPendingTasks(p1);
-
-    control.replay();
-    runPreemptor(p1);
-  }
-
-  @Test
-  public void testIgnoresThrottledTasks() throws Exception {
-    // Ensures that the preemptor does not consider a throttled task to be a preemption candidate.
-    schedulingFilter = createMock(SchedulingFilter.class);
-
-    Storage storage = MemStorage.newEmptyStorage();
-
-    final ScheduledTask throttled = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1").setStatus(THROTTLED);
-    throttled.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    final ScheduledTask pending = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    pending.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider store) {
-        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
-            IScheduledTask.build(pending),
-            IScheduledTask.build(throttled)));
-      }
-    });
-
-    clock.advance(PREEMPTION_DELAY);
-
-    control.replay();
-
-    PreemptorImpl preemptor = new PreemptorImpl(
-        storage,
-        stateManager,
-        offerQueue,
-        schedulingFilter,
-        PREEMPTION_DELAY,
-        clock,
-        statsProvider);
-
-    assertEquals(
-        Optional.<String>absent(),
-        preemptor.findPreemptionSlotFor(pending.getAssignedTask().getTaskId(), emptyJob));
-  }
-
-  // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails.
-
-  private Offer makeOffer(String offerId,
-                          String host,
-                          double cpu,
-                          Amount<Long, Data> ram,
-                          Amount<Long, Data> disk,
-                          int numPorts) {
-    List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList();
-    Offer.Builder builder = Offer.newBuilder();
-    builder.getIdBuilder().setValue(offerId);
-    builder.getFrameworkIdBuilder().setValue("framework-id");
-    builder.getSlaveIdBuilder().setValue(hostToId(host));
-    builder.setHostname(host);
-    for (Resource r: resources) {
-      builder.addResources(r);
-    }
-    return builder.build();
-  }
-
-  private void expectOffers(Offer ... offers) {
-    Iterable<HostOffer> hostOffers = FluentIterable.from(Lists.newArrayList(offers))
-        .transform(new Function<Offer, HostOffer>() {
-          @Override
-          public HostOffer apply(Offer offer) {
-            return new HostOffer(
-                offer,
-                IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
-          }
-        });
-    expect(offerQueue.getOffers()).andReturn(hostOffers);
-  }
-
-  private void expectNoOffers() {
-    expect(offerQueue.getOffers()).andReturn(ImmutableList.<HostOffer>of());
-  }
-
-  private IExpectationSetters<Set<Veto>> expectFiltering() {
-    return expect(schedulingFilter.filter(
-        EasyMock.<UnusedResource>anyObject(),
-        EasyMock.<ResourceRequest>anyObject()))
-        .andAnswer(
-            new IAnswer<Set<Veto>>() {
-            @Override
-            public Set<Veto> answer() {
-              return ImmutableSet.of();
-            }
-          });
-  }
-
-  private void expectPreempted(ScheduledTask preempted) throws Exception {
-    expect(stateManager.changeState(
-        eq(storageUtil.mutableStoreProvider),
-        eq(Tasks.id(preempted)),
-        eq(Optional.<ScheduleStatus>absent()),
-        eq(ScheduleStatus.PREEMPTING),
-        EasyMock.<Optional<String>>anyObject()))
-        .andReturn(true);
-  }
-
-  private ScheduledTask makeProductionTask(String role, String job, String taskId) {
-    return makeTask(role, job, taskId, 0, "prod", true);
-  }
-
-  private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) {
-    return makeTask(role, job, taskId, priority, "prod", true);
-  }
-
-  private ScheduledTask makeTask(String role, String job, String taskId, int priority,
-                                 String env, boolean production) {
-    AssignedTask assignedTask = new AssignedTask()
-        .setTaskId(taskId)
-        .setTask(new TaskConfig()
-            .setJob(new JobKey(role, env, job))
-            .setOwner(new Identity(role, role))
-            .setPriority(priority)
-            .setProduction(production)
-            .setJobName(job)
-            .setEnvironment(env)
-            .setConstraints(new HashSet<Constraint>()));
-    ScheduledTask scheduledTask = new ScheduledTask()
-        .setStatus(PENDING)
-        .setAssignedTask(assignedTask);
-    addEvent(scheduledTask, PENDING);
-    return scheduledTask;
-  }
-
-  private ScheduledTask makeTask(String role, String job, String taskId) {
-    return makeTask(role, job, taskId, 0, "dev", false);
-  }
-
-  private ScheduledTask makeTask(String role, String job, String taskId, int priority) {
-    return makeTask(role, job, taskId, priority, "dev", false);
-  }
-
-  private void addEvent(ScheduledTask task, ScheduleStatus status) {
-    task.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
-  }
-
-  // Slave Hostname to a slave id
-  private String hostToId(String hostname) {
-    return hostname + "_id";
-  }
-
-  private void runOnHost(ScheduledTask task, String host) {
-    task.setStatus(RUNNING);
-    addEvent(task, RUNNING);
-    task.getAssignedTask().setSlaveHost(host);
-    task.getAssignedTask().setSlaveId(hostToId(host));
-  }
-
-  private Attribute host(String host) {
-    return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host));
-  }
-
-  private Attribute rack(String rack) {
-    return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack));
-  }
-
-  // Sets up a normal host, no dedicated hosts and no maintenance.
-  private void setUpHost(String host, String rack) {
-    IHostAttributes hostAttrs = IHostAttributes.build(
-        new HostAttributes().setHost(host).setSlaveId(host + "_id")
-            .setMode(NONE).setAttributes(ImmutableSet.of(rack(rack), host(host))));
-
-    expect(this.storageUtil.attributeStore.getHostAttributes(host))
-        .andReturn(Optional.of(hostAttrs)).anyTimes();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 17f2d77..5647349 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -37,6 +37,7 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -116,8 +117,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         new AbstractModule() {
           @Override
           protected void configure() {
-            bind(AsyncModule.PREEMPTOR_KEY).toInstance(preemptor);
-            AsyncModule.bindTaskScheduler(binder(), AsyncModule.PREEMPTOR_KEY, reservationDuration);
+            bind(Preemptor.class).toInstance(preemptor);
+            AsyncModule.bindTaskScheduler(binder(), reservationDuration);
             bind(OfferQueue.class).toInstance(offerQueue);
             bind(StateManager.class).toInstance(stateManager);
             bind(TaskAssigner.class).toInstance(assigner);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 012804a..6cc1323 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -45,6 +45,7 @@ import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
new file mode 100644
index 0000000..8f91ff6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LiveClusterStateTest extends EasyMockTest {
+
+  private StorageTestUtil storageUtil;
+  private ClusterState clusterState;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    clusterState = new LiveClusterState(storageUtil.storage);
+  }
+
+  @Test
+  public void testEmptyStorage() {
+    storageUtil.expectTaskFetch(LiveClusterState.CANDIDATE_QUERY);
+
+    control.replay();
+
+    assertEquals(
+        ImmutableMultimap.<String, IAssignedTask>of(),
+        clusterState.getSlavesToActiveTasks());
+  }
+
+  private IScheduledTask makeTask(String taskId, String slaveId) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setTaskId(taskId)
+            .setSlaveId(slaveId)));
+  }
+
+  @Test
+  public void testGetActiveTasks() {
+    IScheduledTask a = makeTask("a", "1");
+    IScheduledTask b = makeTask("b", "1");
+    IScheduledTask c = makeTask("c", "2");
+
+    storageUtil.expectTaskFetch(LiveClusterState.CANDIDATE_QUERY, a, b, c);
+
+    control.replay();
+
+    assertEquals(
+        ImmutableMultimap.<String, IAssignedTask>builder()
+            .putAll("1", a.getAssignedTask(), b.getAssignedTask())
+            .putAll("2", c.getAssignedTask())
+            .build(),
+        clusterState.getSlavesToActiveTasks());
+  }
+}