You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2018/01/09 22:51:09 UTC

[1/2] aurora git commit: Refactor scheduling code to split matching and assigning phases

Repository: aurora
Updated Branches:
  refs/heads/master 5b34231ba -> 4e6242fed


http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index 348386e..b3ffb0d 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -39,13 +39,13 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.gen.apiConstants;
-import org.apache.aurora.scheduler.TierInfo;
-import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.mesos.TaskExecutors;
 import org.apache.aurora.scheduler.offers.HostOffer;
+import org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.resources.ResourceType;
@@ -67,9 +67,6 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.Resource.numCpus;
 import static org.apache.aurora.gen.Resource.ramMb;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.PREFERRED_TIER;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl.ORDER;
 import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
@@ -110,7 +107,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   private SchedulingFilter schedulingFilter;
   private FakeStatsProvider statsProvider;
   private PreemptorMetrics preemptorMetrics;
-  private TierManager tierManager;
   private FakeClock clock;
 
   @Before
@@ -119,7 +115,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     storageUtil.expectOperations();
     statsProvider = new FakeStatsProvider();
     preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
-    tierManager = createMock(TierManager.class);
     clock = new FakeClock();
     ResourceType.initializeEmptyCliArgsForTest();
   }
@@ -129,12 +124,11 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
       Optional<HostOffer> offer,
       ScheduledTask... victims) {
 
-    PreemptionVictimFilter.PreemptionVictimFilterImpl filter =
-        new PreemptionVictimFilter.PreemptionVictimFilterImpl(
-            schedulingFilter,
-            TaskExecutors.NO_OVERHEAD_EXECUTOR,
-            preemptorMetrics,
-            tierManager);
+    PreemptionVictimFilterImpl filter = new PreemptionVictimFilterImpl(
+        schedulingFilter,
+        TaskExecutors.NO_OVERHEAD_EXECUTOR,
+        preemptorMetrics,
+        TaskTestUtil.TIER_MANAGER);
 
     return filter.filterPreemptionVictims(
         ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
@@ -145,16 +139,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   }
 
   @Test
-  public void testPreempted() throws Exception {
+  public void testPreempted() {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
     assignToHost(lowPriority);
-    expectGetTier(lowPriority, DEV_TIER).times(2);
 
     ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
-    expectGetTier(highPriority, DEV_TIER);
 
     expectFiltering();
 
@@ -163,7 +155,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   }
 
   @Test
-  public void testLowestPriorityPreempted() throws Exception {
+  public void testLowestPriorityPreempted() {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
@@ -172,10 +164,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
     ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
     assignToHost(lowerPriority);
-    expectGetTier(lowerPriority, DEV_TIER).atLeastOnce();
 
     ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
-    expectGetTier(highPriority, DEV_TIER);
 
     expectFiltering();
 
@@ -184,24 +174,20 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   }
 
   @Test
-  public void testOnePreemptableTask() throws Exception {
+  public void testOnePreemptableTask() {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
     assignToHost(highPriority);
-    expectGetTier(highPriority, DEV_TIER);
 
     ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
     assignToHost(lowerPriority);
-    expectGetTier(lowerPriority, DEV_TIER);
 
     ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
     assignToHost(lowestPriority);
-    expectGetTier(lowestPriority, DEV_TIER).times(2);
 
     ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
-    expectGetTier(pendingPriority, DEV_TIER).times(3);
 
     expectFiltering();
 
@@ -212,29 +198,25 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   }
 
   @Test
-  public void testHigherPriorityRunning() throws Exception {
+  public void testHigherPriorityRunning() {
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
     assignToHost(highPriority);
-    expectGetTier(highPriority, DEV_TIER);
 
     ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
-    expectGetTier(task, DEV_TIER);
 
     control.replay();
     assertNoVictims(runFilter(task, NO_OFFER, highPriority));
   }
 
   @Test
-  public void testProductionPreemptingNonproduction() throws Exception {
+  public void testProductionPreemptingNonproduction() {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
     // Use a very low priority for the production task to show that priority is irrelevant.
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
-    expectGetTier(p1, PREFERRED_TIER);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
-    expectGetTier(a1, DEV_TIER).times(2);
     assignToHost(a1);
 
     expectFiltering();
@@ -244,16 +226,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   }
 
   @Test
-  public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+  public void testProductionPreemptingNonproductionAcrossUsers() {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
     // Use a very low priority for the production task to show that priority is irrelevant.
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
-    expectGetTier(p1, PREFERRED_TIER);
     ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
     assignToHost(a1);
-    expectGetTier(a1, DEV_TIER).times(2);
 
     expectFiltering();
 
@@ -262,12 +242,10 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   }
 
   @Test
-  public void testProductionUsersDoNotPreemptEachOther() throws Exception {
+  public void testProductionUsersDoNotPreemptEachOther() {
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
-    expectGetTier(p1, PREFERRED_TIER);
     ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
-    expectGetTier(a1, PREFERRED_TIER);
     assignToHost(a1);
 
     control.replay();
@@ -276,17 +254,15 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
   // Ensures a production task can preempt 2 tasks on the same host.
   @Test
-  public void testProductionPreemptingManyNonProduction() throws Exception {
+  public void testProductionPreemptingManyNonProduction() {
     schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     setResource(a1, CPUS, 1.0);
     setResource(a1, RAM_MB, 512.0);
-    expectGetTier(a1, DEV_TIER).atLeastOnce();
 
     ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
     setResource(b1, CPUS, 1.0);
     setResource(b1, RAM_MB, 512.0);
-    expectGetTier(b1, DEV_TIER).atLeastOnce();
 
     setUpHost();
 
@@ -296,7 +272,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
     setResource(p1, CPUS, 2.0);
     setResource(p1, RAM_MB, 1024.0);
-    expectGetTier(p1, PREFERRED_TIER).times(2);
 
     control.replay();
     assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1);
@@ -304,12 +279,11 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
   // Ensures we select the minimal number of tasks to preempt
   @Test
-  public void testMinimalSetPreempted() throws Exception {
+  public void testMinimalSetPreempted() {
     schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     setResource(a1, CPUS, 4.0);
     setResource(a1, RAM_MB, 4096.0);
-    expectGetTier(a1, DEV_TIER).atLeastOnce();
 
     ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
     b1.getAssignedTask().getTask()
@@ -318,12 +292,10 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
             ramMb(512)));
     setResource(b1, CPUS, 1.0);
     setResource(b1, RAM_MB, 512.0);
-    expectGetTier(b1, DEV_TIER).anyTimes();
 
     ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
     setResource(b2, CPUS, 1.0);
     setResource(b2, RAM_MB, 512.0);
-    expectGetTier(b2, DEV_TIER).anyTimes();
 
     setUpHost();
 
@@ -334,7 +306,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
     setResource(p1, CPUS, 2.0);
     setResource(p1, RAM_MB, 1024.0);
-    expectGetTier(p1, PREFERRED_TIER).times(3);
 
     control.replay();
     assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1);
@@ -342,14 +313,13 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
   // Ensures a production task *never* preempts a production task from another job.
   @Test
-  public void testProductionJobNeverPreemptsProductionJob() throws Exception {
+  public void testProductionJobNeverPreemptsProductionJob() {
     schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
     p1.getAssignedTask().getTask()
         .setResources(ImmutableSet.of(
             numCpus(2),
             ramMb(1024)));
-    expectGetTier(p1, PREFERRED_TIER);
 
     setUpHost();
 
@@ -360,7 +330,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
         .setResources(ImmutableSet.of(
             numCpus(1),
             ramMb(512)));
-    expectGetTier(p2, PREFERRED_TIER);
 
     control.replay();
     assertNoVictims(runFilter(p2, NO_OFFER, p1));
@@ -368,7 +337,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
   // Ensures that we can preempt if a task + offer can satisfy a pending task.
   @Test
-  public void testPreemptWithOfferAndTask() throws Exception {
+  public void testPreemptWithOfferAndTask() {
     schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
 
     setUpHost();
@@ -379,14 +348,12 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
             numCpus(1),
             ramMb(512)));
     assignToHost(a1);
-    expectGetTier(a1, DEV_TIER).times(2);
 
     ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
     p1.getAssignedTask().getTask()
         .setResources(ImmutableSet.of(
             numCpus(2),
             ramMb(1024)));
-    expectGetTier(p1, PREFERRED_TIER);
 
     control.replay();
     assertVictims(
@@ -399,7 +366,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
   // Ensures revocable offer resources are filtered out.
   @Test
-  public void testRevocableOfferFiltered() throws Exception {
+  public void testRevocableOfferFiltered() {
     schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
 
     setUpHost();
@@ -408,12 +375,10 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     setResource(a1, CPUS, 1.0);
     setResource(a1, RAM_MB, 512.0);
     assignToHost(a1);
-    expectGetTier(a1, DEV_TIER).times(2);
 
     ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
     setResource(p1, CPUS, 2.0);
     setResource(p1, RAM_MB, 1024.0);
-    expectGetTier(p1, PREFERRED_TIER);
 
     control.replay();
     assertNoVictims(runFilter(
@@ -424,21 +389,20 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
   // Ensures revocable task CPU is not considered for preemption.
   @Test
-  public void testRevocableVictimsFiltered() throws Exception {
+  public void testRevocableVictimsFiltered() {
     schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
 
     setUpHost();
 
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setTier(TaskTestUtil.REVOCABLE_TIER_NAME);
     setResource(a1, CPUS, 1.0);
     setResource(a1, RAM_MB, 512.0);
     assignToHost(a1);
-    expectGetTier(a1, REVOCABLE_TIER).times(2);
 
     ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
     setResource(p1, CPUS, 2.0);
     setResource(p1, RAM_MB, 1024.0);
-    expectGetTier(p1, PREFERRED_TIER);
 
     control.replay();
     assertNoVictims(runFilter(
@@ -449,7 +413,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
   // Ensures revocable victim non-compressible resources are still considered.
   @Test
-  public void testRevocableVictimRamUsed() throws Exception {
+  public void testRevocableVictimRamUsed() {
     schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
 
     setUpHost();
@@ -458,12 +422,10 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     setResource(a1, CPUS, 1.0);
     setResource(a1, RAM_MB, 512.0);
     assignToHost(a1);
-    expectGetTier(a1, REVOCABLE_TIER).times(2);
 
     ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
     setResource(p1, CPUS, 2.0);
     setResource(p1, RAM_MB, 1024.0);
-    expectGetTier(p1, PREFERRED_TIER);
 
     control.replay();
     assertVictims(
@@ -476,7 +438,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
   // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
   @Test
-  public void testPreemptWithOfferAndMultipleTasks() throws Exception {
+  public void testPreemptWithOfferAndMultipleTasks() {
     schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
 
     setUpHost();
@@ -485,18 +447,15 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     setResource(a1, CPUS, 1.0);
     setResource(a1, RAM_MB, 512.0);
     assignToHost(a1);
-    expectGetTier(a1, DEV_TIER).atLeastOnce();
 
     ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
     setResource(a2, CPUS, 1.0);
     setResource(a2, RAM_MB, 512.0);
     assignToHost(a2);
-    expectGetTier(a2, DEV_TIER).atLeastOnce();
 
     ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
     setResource(p1, CPUS, 4.0);
     setResource(p1, RAM_MB, 2048.0);
-    expectGetTier(p1, PREFERRED_TIER).times(2);
 
     control.replay();
     Optional<HostOffer> offer =
@@ -519,13 +478,11 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
     assignToHost(task);
-    expectGetTier(task, PREFERRED_TIER);
 
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     setResource(a1, CPUS, 1.0);
     setResource(a1, RAM_MB, 512.0);
     assignToHost(a1);
-    expectGetTier(a1, DEV_TIER);
 
     expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.empty());
 
@@ -540,13 +497,11 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
     assignToHost(task);
-    expectGetTier(task, PREFERRED_TIER);
 
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     setResource(a1, CPUS, 1.0);
     setResource(a1, RAM_MB, 512.0);
     assignToHost(a1);
-    expectGetTier(a1, DEV_TIER).times(2);
 
     setUpHost();
     expectFiltering(Optional.of(Veto.constraintMismatch("ban")));
@@ -661,11 +616,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
         .andAnswer(() -> veto.map(ImmutableSet::of).orElse(ImmutableSet.of()));
   }
 
-  private IExpectationSetters<TierInfo> expectGetTier(ScheduledTask task, TierInfo tier) {
-    return expect(tierManager.getTier(ITaskConfig.build(task.getAssignedTask().getTask())))
-        .andReturn(tier);
-  }
-
   private static void setResource(ScheduledTask task, ResourceType type, Double value) {
     task.getAssignedTask().setTask(ResourceTestUtil.resetResource(
         ITaskConfig.build(task.getAssignedTask().getTask()),
@@ -686,17 +636,18 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
         .setTask(new TaskConfig()
             .setJob(new JobKey(role, env, job))
             .setPriority(priority)
+            .setTier(production ? TaskTestUtil.PROD_TIER_NAME : null)
             .setProduction(production)
             .setConstraints(Sets.newHashSet())
             .setExecutorConfig(new ExecutorConfig(apiConstants.AURORA_EXECUTOR_NAME, "config")));
     return new ScheduledTask().setAssignedTask(assignedTask);
   }
 
-  static ScheduledTask makeTask(String role, String job, String taskId) {
+  private static ScheduledTask makeTask(String role, String job, String taskId) {
     return makeTask(role, job, taskId, 0, "dev", false);
   }
 
-  static void addEvent(ScheduledTask task, ScheduleStatus status) {
+  private static void addEvent(ScheduledTask task, ScheduleStatus status) {
     task.addToTaskEvents(new TaskEvent(0, status));
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
index 5e2fdcb..e6b2b74 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
@@ -16,15 +16,14 @@ package org.apache.aurora.scheduler.scheduling;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.offers.HostOffer;
-import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -32,10 +31,8 @@ import static org.junit.Assert.assertFalse;
 public class FirstFitOfferSelectorTest extends EasyMockTest {
 
   private static final IAssignedTask TASK = makeTask("id", JOB).getAssignedTask();
-  private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
-      TASK.getTask(),
-      ResourceBag.EMPTY,
-      empty());
+  private static final ResourceRequest EMPTY_REQUEST =
+      TaskTestUtil.toResourceRequest(TASK.getTask());
 
   private OfferSelector firstFitOfferSelector;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
index 78e1269..f84941e 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
@@ -26,28 +26,22 @@ import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.FrameworkID;
-import org.apache.mesos.v1.Protos.OfferID;
 import org.apache.mesos.v1.Protos.TaskID;
 import org.apache.mesos.v1.Protos.TaskInfo;
 import org.junit.Before;
@@ -55,10 +49,10 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
@@ -88,27 +82,22 @@ public class TaskAssignerImplTest extends EasyMockTest {
           .setHost(MESOS_OFFER.getHostname())
           .setAttributes(ImmutableSet.of(
               new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
-  private static final IScheduledTask TASK = makeTask("id", JOB);
-  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
+  private static final IAssignedTask TASK = makeTask("id", JOB).getAssignedTask();
+  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
-      .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
+      .setTaskId(TaskID.newBuilder().setValue(TASK.getTaskId()))
       .setAgentId(MESOS_OFFER.getAgentId())
       .build();
-  private static final IInstanceKey INSTANCE_KEY =
-      InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
+  private static final IInstanceKey INSTANCE_KEY = InstanceKeys.from(JOB, TASK.getInstanceId());
   private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
-  private static final HostOffer OFFER_2 = new HostOffer(
-      Offer.newBuilder()
-          .setId(OfferID.newBuilder().setValue("offerId0"))
-          .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-          .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
-          .setHostname("hostName0")
-          .addResources(mesosRange(PORTS, PORT))
-          .addResources(mesosScalar(CPUS, 1))
-          .addResources(mesosScalar(RAM_MB, 1024))
-          .build(),
-      IHostAttributes.build(new HostAttributes()));
+  private static final Offer MESOS_OFFER_2 =
+      offer("offer-2", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT));
+  private static final HostOffer OFFER_2 =
+      new HostOffer(MESOS_OFFER_2, IHostAttributes.build(new HostAttributes()
+          .setHost(MESOS_OFFER_2.getHostname())
+          .setAttributes(ImmutableSet.of(
+              new Attribute("host", ImmutableSet.of(MESOS_OFFER_2.getHostname()))))));
 
   private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
 
@@ -120,17 +109,15 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private MesosTaskFactory taskFactory;
   private OfferManager offerManager;
   private TaskAssignerImpl assigner;
-  private TierManager tierManager;
   private FakeStatsProvider statsProvider;
   private UpdateAgentReserver updateAgentReserver;
 
   @Before
-  public void setUp() throws Exception {
+  public void setUp() {
     storeProvider = createMock(MutableStoreProvider.class);
     taskFactory = createMock(MesosTaskFactory.class);
     stateManager = createMock(StateManager.class);
     offerManager = createMock(OfferManager.class);
-    tierManager = createMock(TierManager.class);
     updateAgentReserver = createMock(UpdateAgentReserver.class);
     statsProvider = new FakeStatsProvider();
     // TODO(jly): FirstFitOfferSelector returns the first offer which is what we want for testing,
@@ -140,43 +127,49 @@ public class TaskAssignerImplTest extends EasyMockTest {
         stateManager,
         taskFactory,
         offerManager,
-        tierManager,
         updateAgentReserver,
         statsProvider,
         offerSelector);
     aggregate = empty();
-    resourceRequest = new ResourceRequest(
-        TASK.getAssignedTask().getTask(),
-        ResourceBag.EMPTY,
-        aggregate);
+    resourceRequest = ResourceRequest.fromTask(
+        TASK.getTask(),
+        NO_OVERHEAD_EXECUTOR,
+        aggregate,
+        TaskTestUtil.TIER_MANAGER);
   }
 
   @Test
-  public void testAssignNoTasks() throws Exception {
+  public void testAssignNoTasks() {
     control.replay();
 
     assertEquals(
         NO_ASSIGNMENT,
-        assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            GROUP_KEY,
+            ImmutableSet.of(),
+            NO_RESERVATION));
   }
 
   @Test
   public void testAssignmentClearedOnError() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
-        .andReturn(ImmutableSet.of(OFFER, OFFER_2));
+    expect(updateAgentReserver.isReserved(anyString())).andReturn(false).atLeastOnce();
+    expect(updateAgentReserver.getAgent(anyObject())).andReturn(Optional.empty()).atLeastOnce();
+
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest))
+        .andReturn(ImmutableSet.of(OFFER, OFFER_2)).atLeastOnce();
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expectAssignTask(MESOS_OFFER);
     expect(stateManager.changeState(
         storeProvider,
-        Tasks.id(TASK),
+        TASK.getTaskId(),
         Optional.of(PENDING),
         LOST,
         LAUNCH_FAILED_MSG))
         .andReturn(StateChangeResult.SUCCESS);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+    expect(taskFactory.createFrom(TASK, MESOS_OFFER, false))
         .andReturn(TASK_INFO);
 
     control.replay();
@@ -188,9 +181,9 @@ public class TaskAssignerImplTest extends EasyMockTest {
         assigner.maybeAssign(
             storeProvider,
             resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            TaskGroupKey.from(TASK.getTask()),
             ImmutableSet.of(
-                TASK.getAssignedTask(),
+                TASK,
                 makeTask("id2", JOB).getAssignedTask(),
                 makeTask("id3", JOB).getAssignedTask()),
             NO_RESERVATION));
@@ -198,10 +191,9 @@ public class TaskAssignerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testAssignmentSkippedForReservedSlave() throws Exception {
+  public void testAssignmentSkippedForReservedSlave() {
     expectNoUpdateReservations(0);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest))
         .andReturn(ImmutableSet.of(OFFER));
 
     control.replay();
@@ -211,8 +203,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
         assigner.maybeAssign(
             storeProvider,
             resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
+            TaskGroupKey.from(TASK.getTask()),
+            ImmutableSet.of(TASK),
             ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
                 ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
   }
@@ -223,73 +215,68 @@ public class TaskAssignerImplTest extends EasyMockTest {
     // and permissive in task->slave direction. In other words, a task with a slave reservation
     // should still be tried against other unreserved slaves.
     expectNoUpdateReservations(1);
-    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest))
         .andReturn(ImmutableSet.of(OFFER_2, OFFER));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expectAssignTask(OFFER_2.getOffer());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), false))
+    expect(taskFactory.createFrom(TASK, OFFER_2.getOffer(), false))
         .andReturn(TASK_INFO);
     offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
 
     control.replay();
 
     assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
+        ImmutableSet.of(TASK.getTaskId()),
         assigner.maybeAssign(
             storeProvider,
             resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
+            TaskGroupKey.from(TASK.getTask()),
+            ImmutableSet.of(TASK),
             ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
   }
 
   @Test
   public void testResourceMapperCallback() {
-    AssignedTask builder = TASK.newBuilder().getAssignedTask();
+    AssignedTask builder = TASK.newBuilder();
     builder.unsetAssignedPorts();
 
     control.replay();
 
     assertEquals(
-        TASK.getAssignedTask(),
+        TASK,
         assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
   }
 
   @Test
   public void testAssignToReservedAgent() throws Exception {
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
     expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
     updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
-    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest))
         .andReturn(Optional.of(OFFER));
     expectAssignTask(MESOS_OFFER);
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
 
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+    expect(taskFactory.createFrom(TASK, MESOS_OFFER, false))
         .andReturn(TASK_INFO);
 
     control.replay();
 
     assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
+        ImmutableSet.of(TASK.getTaskId()),
         assigner.maybeAssign(
             storeProvider,
             resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            TaskGroupKey.from(TASK.getTask()),
             ImmutableSet.of(
-                TASK.getAssignedTask()),
+                TASK),
             ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
     assertNotEquals(empty(), aggregate);
   }
 
   @Test
-  public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+  public void testAssignReservedAgentWhenOfferNotReady() {
     expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest))
         .andReturn(Optional.empty());
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expectLastCall();
 
     control.replay();
@@ -299,55 +286,48 @@ public class TaskAssignerImplTest extends EasyMockTest {
         assigner.maybeAssign(
             storeProvider,
             resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
+            TaskGroupKey.from(TASK.getTask()),
+            ImmutableSet.of(TASK),
             ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
     assertEquals(empty(), aggregate);
   }
 
   @Test
   public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
     expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
     updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
-    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest))
         .andReturn(Optional.of(OFFER));
     expectAssignTask(MESOS_OFFER);
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+    expect(taskFactory.createFrom(TASK, MESOS_OFFER, false))
         .andReturn(TASK_INFO);
 
-    // Normal scheduling loop for the remaining task...
-    IScheduledTask secondTask = makeTask("another-task", JOB, 9999);
+    // Normal scheduling loop for the remaining task.
+    IAssignedTask secondTask = makeTask("another-task", JOB, 9999).getAssignedTask();
     TaskInfo secondTaskInfo = TaskInfo.newBuilder()
         .setName("another-task")
-        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(secondTask)))
+        .setTaskId(TaskID.newBuilder().setValue(secondTask.getTaskId()))
         .setAgentId(MESOS_OFFER.getAgentId())
         .build();
     expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.empty());
-    ImmutableSet<HostOffer> matchingOffers = ImmutableSet.of(OFFER);
-    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
-        .andReturn(matchingOffers);
-    expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
-        .andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER, secondTask);
-    offerManager.launchTask(MESOS_OFFER.getId(), secondTaskInfo);
-    expect(taskFactory.createFrom(secondTask.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(secondTaskInfo);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest))
+        .andReturn(ImmutableSet.of(OFFER_2));
+    expect(updateAgentReserver.isReserved(OFFER_2.getOffer().getAgentId().getValue()))
+        .andReturn(false);
+    expectAssignTask(MESOS_OFFER_2, secondTask);
+    offerManager.launchTask(MESOS_OFFER_2.getId(), secondTaskInfo);
+    expect(taskFactory.createFrom(secondTask, MESOS_OFFER_2, false)).andReturn(secondTaskInfo);
 
     control.replay();
 
     assertEquals(
-        Tasks.ids(TASK, secondTask),
+        ImmutableSet.of(TASK.getTaskId(), secondTask.getTaskId()),
         assigner.maybeAssign(
             storeProvider,
             resourceRequest,
             GROUP_KEY,
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                secondTask.getAssignedTask()),
+            ImmutableSet.of(TASK, secondTask),
             ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
     assertNotEquals(empty(), aggregate);
   }
@@ -356,19 +336,19 @@ public class TaskAssignerImplTest extends EasyMockTest {
     expectAssignTask(offer, TASK);
   }
 
-  private void expectAssignTask(Offer offer, IScheduledTask task) {
+  private void expectAssignTask(Offer offer, IAssignedTask task) {
     expect(stateManager.assignTask(
         eq(storeProvider),
-        eq(Tasks.id(task)),
+        eq(task.getTaskId()),
         eq(offer.getHostname()),
         eq(offer.getAgentId()),
-        anyObject())).andReturn(task.getAssignedTask());
+        anyObject())).andReturn(task);
   }
 
   private void expectNoUpdateReservations(int offers) {
-    expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
-    for (int i = 0; i < offers; i++) {
-      expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
+    if (offers > 0) {
+      expect(updateAgentReserver.isReserved(anyString())).andReturn(false).times(offers);
     }
+    expect(updateAgentReserver.getAgent(anyObject())).andReturn(Optional.empty());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index ecf2987..ac2df94 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -31,6 +31,7 @@ import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
@@ -44,8 +45,6 @@ import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -62,6 +61,7 @@ import org.junit.Test;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR;
 import static org.easymock.EasyMock.eq;
@@ -105,7 +105,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         new AbstractModule() {
           @Override
           protected void configure() {
-
+            bind(TierManager.class).toInstance(TIER_MANAGER);
             bind(Executor.class).annotatedWith(AsyncExecutor.class)
                 .toInstance(MoreExecutors.directExecutor());
             bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
@@ -127,28 +127,24 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         ImmutableSet.of(task));
   }
 
-  private ResourceBag bag(IScheduledTask task) {
-    return ResourceManager.bagFromResources(task.getAssignedTask().getTask().getResources())
-        .add(THERMOS_EXECUTOR.getExecutorOverhead(task.getAssignedTask()
-            .getTask()
-            .getExecutorConfig()
-            .getName()).get());
-  }
-
   private IExpectationSetters<Set<String>> expectAssigned(
       IScheduledTask task,
       Map<String, TaskGroupKey> reservationMap) {
 
     return expect(assigner.maybeAssign(
         storageUtil.mutableStoreProvider,
-        new ResourceRequest(task.getAssignedTask().getTask(), bag(task), empty()),
+        ResourceRequest.fromTask(
+            task.getAssignedTask().getTask(),
+            THERMOS_EXECUTOR,
+            empty(),
+            TIER_MANAGER),
         TaskGroupKey.from(task.getAssignedTask().getTask()),
         ImmutableSet.of(task.getAssignedTask()),
         reservationMap));
   }
 
   @Test
-  public void testSchedule() throws Exception {
+  public void testSchedule() {
     storageUtil.expectOperations();
 
     expectAsMap(NO_RESERVATION);
@@ -164,7 +160,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testScheduleNoTask() throws Exception {
+  public void testScheduleNoTask() {
     storageUtil.expectOperations();
     storageUtil.expectTaskFetch(
         Query.taskScoped(Tasks.id(TASK_A)).byStatus(PENDING),
@@ -178,7 +174,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testSchedulePartial() throws Exception {
+  public void testSchedulePartial() {
     storageUtil.expectOperations();
 
     String taskB = "b";
@@ -214,7 +210,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservation() throws Exception {
+  public void testReservation() {
     storageUtil.expectOperations();
 
     // No reservation available in preemptor
@@ -254,7 +250,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservationUnusable() throws Exception {
+  public void testReservationUnusable() {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
@@ -271,7 +267,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservationRemoved() throws Exception {
+  public void testReservationRemoved() {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
@@ -288,14 +284,14 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testNonPendingIgnored() throws Exception {
+  public void testNonPendingIgnored() {
     control.replay();
 
     eventSink.post(TaskStateChange.transition(TASK_A, RUNNING));
   }
 
   @Test
-  public void testPendingDeletedHandled() throws Exception {
+  public void testPendingDeletedHandled() {
     reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
 
     control.replay();
@@ -325,7 +321,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectAsMap(NO_RESERVATION);
     expect(assigner.maybeAssign(
         EasyMock.anyObject(),
-        eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), empty())),
+        eq(ResourceRequest.fromTask(
+            taskA.getAssignedTask().getTask(),
+            THERMOS_EXECUTOR,
+            empty(),
+            TIER_MANAGER)),
         eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())),
         eq(ImmutableSet.of(taskA.getAssignedTask())),
         eq(NO_RESERVATION))).andReturn(SCHEDULED_RESULT);
@@ -337,7 +337,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testScheduleThrows() throws Exception {
+  public void testScheduleThrows() {
     storageUtil.expectOperations();
 
     expectAsMap(NO_RESERVATION);
@@ -370,17 +370,17 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask()));
   }
 
-  private IExpectationSetters<?> expectGetReservation(IScheduledTask task, String slaveId) {
-    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+  private void expectGetReservation(IScheduledTask task, String slaveId) {
+    expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
         .andReturn(ImmutableSet.of(slaveId));
   }
 
-  private IExpectationSetters<?> expectNoReservation(IScheduledTask task) {
-    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+  private void expectNoReservation(IScheduledTask task) {
+    expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
         .andReturn(ImmutableSet.of());
   }
 
-  private IExpectationSetters<?> expectAsMap(Map<String, TaskGroupKey> map) {
-    return expect(reservations.asMap()).andReturn(map);
+  private void expectAsMap(Map<String, TaskGroupKey> map) {
+    expect(reservations.asMap()).andReturn(map);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java b/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
index d8563b8..6eebdb5 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
@@ -21,7 +21,6 @@ import org.apache.aurora.scheduler.updater.UpdateAgentReserver.NullAgentReserver
 import org.junit.Test;
 
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 public class NullAgentReserverTest extends EasyMockTest {
   private static final IInstanceKey INSTANCE_KEY =
@@ -33,6 +32,6 @@ public class NullAgentReserverTest extends EasyMockTest {
     NullAgentReserver reserver = new NullAgentReserver();
     reserver.reserve("test", INSTANCE_KEY);
     assertFalse(reserver.getAgent(INSTANCE_KEY).isPresent());
-    assertTrue(reserver.getReservations("test").isEmpty());
+    assertFalse(reserver.isReserved("test"));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java b/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
index 7f17be0..051ac0e 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
@@ -13,25 +13,19 @@
  */
 package org.apache.aurora.scheduler.updater;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.Resource;
-import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.updater.UpdateAgentReserver.UpdateAgentReserverImpl;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -43,16 +37,6 @@ public class UpdateAgentReserverImplTest extends EasyMockTest {
   private static final IInstanceKey INSTANCE_KEY =
       InstanceKeys.from(JobKeys.from("role", "env", "name"), 1);
 
-  private TaskGroupKey getTaskGroup(IInstanceKey key) {
-    return TaskGroupKey.from(ITaskConfig.build(
-        new TaskConfig()
-            .setJob(key.getJobKey().newBuilder())
-            .setResources(ImmutableSet.of(
-                Resource.numCpus(1.0),
-                Resource.ramMb(1L),
-                Resource.diskMb(1L)))));
-  }
-
   @Before
   public void setUp() {
     cache = createMock(new Clazz<BiCache<IInstanceKey, String>>() { });
@@ -76,32 +60,11 @@ public class UpdateAgentReserverImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testGetReservations() {
+  public void testIsReserved() {
     expect(cache.getByValue(AGENT_ID)).andReturn(ImmutableSet.of(INSTANCE_KEY));
+    expect(cache.getByValue(AGENT_ID)).andReturn(ImmutableSet.of());
     control.replay();
-    assertEquals(ImmutableSet.of(INSTANCE_KEY), reserver.getReservations(AGENT_ID));
+    assertTrue(reserver.isReserved(AGENT_ID));
+    assertFalse(reserver.isReserved(AGENT_ID));
   }
-
-  @Test
-  public void testHasReservations() {
-    IInstanceKey instanceKey2 = InstanceKeys.from(JobKeys.from("role", "env", "name"), 2);
-    IInstanceKey instanceKey3 = InstanceKeys.from(JobKeys.from("role2", "env2", "name2"), 1);
-    expect(cache.asMap())
-        .andReturn(ImmutableMap.of(
-            INSTANCE_KEY,
-            AGENT_ID,
-            instanceKey2,
-            AGENT_ID,
-            instanceKey3,
-            "different-agent")).anyTimes();
-    control.replay();
-    assertTrue(reserver.hasReservations(getTaskGroup(INSTANCE_KEY)));
-    assertTrue(reserver.hasReservations(getTaskGroup(instanceKey2)));
-    assertTrue(reserver.hasReservations(getTaskGroup(instanceKey3)));
-    assertTrue(reserver.hasReservations(
-        getTaskGroup(InstanceKeys.from(JobKeys.from("role", "env", "name"), 3))));
-    assertFalse(reserver.hasReservations(
-        getTaskGroup(InstanceKeys.from(JobKeys.from("not", "in", "map"), 1))));
-  }
-
 }


[2/2] aurora git commit: Refactor scheduling code to split matching and assigning phases

Posted by wf...@apache.org.
Refactor scheduling code to split matching and assigning phases

This patch sets the stage for performing the bulk of scheduling work in a
separate call path, without holding the write lock.

Also included is a mechanical refactor pushing the `revocable` flag into
`ResourceRequest` (which was ~always needed as a sibling parameter).

Reviewed at https://reviews.apache.org/r/64954/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4e6242fe
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4e6242fe
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4e6242fe

Branch: refs/heads/master
Commit: 4e6242fed68650e7be906ec3d17ae750f49f8cb8
Parents: 5b34231
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 9 14:50:51 2018 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 9 14:50:51 2018 -0800

----------------------------------------------------------------------
 .../common/testing/easymock/EasyMockTest.java   |  14 +-
 .../benchmark/fakes/FakeOfferManager.java       |  12 +-
 .../apache/aurora/scheduler/TierManager.java    |   4 +-
 .../aurora/scheduler/base/TaskTestUtil.java     |  14 +-
 .../executor/ExecutorSettings.java              |  21 +-
 .../scheduler/filter/SchedulingFilter.java      |  42 +++-
 .../scheduler/mesos/MesosTaskFactory.java       |   7 +-
 .../aurora/scheduler/offers/HostOffers.java     |  29 +--
 .../aurora/scheduler/offers/OfferManager.java   |  10 +-
 .../scheduler/offers/OfferManagerImpl.java      |  12 +-
 .../scheduler/preemptor/PreemptionVictim.java   |   5 +-
 .../preemptor/PreemptionVictimFilter.java       |  20 +-
 .../scheduler/resources/ResourceManager.java    |   6 +
 .../scheduler/scheduling/TaskAssigner.java      |   2 +-
 .../scheduler/scheduling/TaskAssignerImpl.java  | 237 +++++++++----------
 .../scheduler/scheduling/TaskScheduler.java     |   2 +-
 .../scheduler/scheduling/TaskSchedulerImpl.java | 100 ++++----
 .../aurora/scheduler/storage/TaskStore.java     |   3 +-
 .../storage/durability/WriteRecorder.java       |   3 +-
 .../scheduler/updater/UpdateAgentReserver.java  |  33 +--
 .../aurora/scheduler/TierManagerTest.java       |   7 +
 .../events/NotifyingSchedulingFilterTest.java   |   6 +-
 .../filter/SchedulingFilterImplTest.java        |  36 ++-
 .../mesos/MesosTaskFactoryImplTest.java         |  12 +-
 .../scheduler/offers/OfferManagerImplTest.java  |  51 ++--
 .../preemptor/PreemptionVictimFilterTest.java   | 101 ++------
 .../scheduling/FirstFitOfferSelectorTest.java   |   9 +-
 .../scheduling/TaskAssignerImplTest.java        | 170 ++++++-------
 .../scheduling/TaskSchedulerImplTest.java       |  56 ++---
 .../updater/NullAgentReserverTest.java          |   3 +-
 .../updater/UpdateAgentReserverImplTest.java    |  45 +---
 31 files changed, 476 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
index c5500b3..15fc677 100644
--- a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
+++ b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
@@ -26,6 +26,9 @@ import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 
 import static org.easymock.EasyMock.createControl;
 
@@ -38,6 +41,16 @@ import static org.easymock.EasyMock.createControl;
 public abstract class EasyMockTest extends TearDownTestCase {
   protected IMocksControl control;
 
+  @Rule
+  public final TestWatcher verifyControl = new TestWatcher() {
+    @Override
+    protected void succeeded(Description description) {
+      // Only attempt to verify the control when the test case otherwise succeeded.  This prevents
+      // spurious mock-related error messages that distract from the real error.
+      control.verify();
+    }
+  };
+
   /**
    * Creates an EasyMock {@link #control} for tests to use that will be automatically
    * {@link IMocksControl#verify() verified} on tear down.
@@ -45,7 +58,6 @@ public abstract class EasyMockTest extends TearDownTestCase {
   @Before
   public final void setupEasyMock() {
     control = createControl();
-    addTearDown(() -> control.verify());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index f0dacd4..0a105c7 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -15,6 +15,8 @@ package org.apache.aurora.benchmark.fakes;
 
 import java.util.Optional;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -59,18 +61,14 @@ public class FakeOfferManager implements OfferManager {
   }
 
   @Override
-  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                         ResourceRequest resourceRequest,
-                                         boolean revocable) {
-
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) {
     return Optional.empty();
   }
 
   @Override
   public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                            ResourceRequest resourceRequest,
-                                            boolean revocable) {
+                                            ResourceRequest resourceRequest) {
 
-    return null;
+    return ImmutableList.of();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/TierManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TierManager.java b/src/main/java/org/apache/aurora/scheduler/TierManager.java
index c6ad2b1..a37fea4 100644
--- a/src/main/java/org/apache/aurora/scheduler/TierManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/TierManager.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler;
 
 import java.util.Map;
+import java.util.Optional;
 
 import javax.inject.Inject;
 
@@ -102,7 +103,8 @@ public interface TierManager {
           !taskConfig.isSetTier() || tierConfig.tiers.containsKey(taskConfig.getTier()),
           "Invalid tier '%s' in TaskConfig.", taskConfig.getTier());
 
-      return tierConfig.tiers.get(taskConfig.getTier());
+      return tierConfig.tiers.get(
+          Optional.ofNullable(taskConfig.getTier()).orElse(tierConfig.defaultTier));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 22d5a64..2b61c27 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -47,6 +47,8 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -70,12 +72,14 @@ public final class TaskTestUtil {
       new TierInfo(true /* preemptible */, false /* revocable */);
   public static final TierInfo PREFERRED_TIER =
       new TierInfo(false /* preemptible */, false /* revocable */);
+  public static final String REVOCABLE_TIER_NAME = "tier-revocable";
   public static final String PROD_TIER_NAME = "tier-prod";
   public static final String DEV_TIER_NAME = "tier-dev";
   public static final TierConfig TIER_CONFIG =
       new TierConfig(DEV_TIER_NAME, ImmutableMap.of(
           PROD_TIER_NAME, PREFERRED_TIER,
-          DEV_TIER_NAME, DEV_TIER
+          DEV_TIER_NAME, DEV_TIER,
+          REVOCABLE_TIER_NAME, REVOCABLE_TIER
       ));
   public static final TierManager TIER_MANAGER = new TierManager.TierManagerImpl(TIER_CONFIG);
   public static final ThriftBackfill THRIFT_BACKFILL = new ThriftBackfill(TIER_MANAGER);
@@ -237,4 +241,12 @@ public final class TaskTestUtil {
         new org.apache.aurora.gen.TierConfig("revocable", REVOCABLE_TIER.toMap())
     );
   }
+
+  public static ResourceRequest toResourceRequest(ITaskConfig task) {
+    return ResourceRequest.fromTask(
+        task,
+        EXECUTOR_SETTINGS,
+        AttributeAggregate.empty(),
+        TIER_MANAGER);
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
index 5c987fd..dac84e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
@@ -19,6 +19,9 @@ import java.util.Optional;
 
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -26,6 +29,9 @@ import static java.util.Objects.requireNonNull;
  * Configuration for the executor to run, and resource overhead required for it.
  */
 public class ExecutorSettings {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutorSettings.class);
+
   private final Map<String, ExecutorConfig> config;
   private final boolean populateDiscoveryInfo;
 
@@ -45,12 +51,19 @@ public class ExecutorSettings {
     return populateDiscoveryInfo;
   }
 
-  public Optional<ResourceBag> getExecutorOverhead(String name) {
+  public ResourceBag getExecutorOverhead(ITaskConfig task) {
+    if (!task.isSetExecutorConfig()) {
+      // Docker-based tasks don't need executors
+      return ResourceBag.EMPTY;
+    }
+
+    String name = task.getExecutorConfig().getName();
     if (config.containsKey(name)) {
-      return Optional.of(
-          ResourceManager.bagFromMesosResources(config.get(name).getExecutor().getResourcesList()));
+      return ResourceManager.bagFromMesosResources(
+          config.get(name).getExecutor().getResourcesList());
     } else {
-      return Optional.empty();
+      LOG.warn("No executor configuration found for " + name);
+      return ResourceBag.EMPTY;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index bd41590..fd97259 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -21,12 +21,17 @@ import java.util.Set;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
+import static java.util.Objects.requireNonNull;
+
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.CONSTRAINT_MISMATCH;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.INSUFFICIENT_RESOURCES;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.LIMIT_NOT_SATISFIED;
@@ -306,11 +311,31 @@ public interface SchedulingFilter {
     private final ITaskConfig task;
     private final ResourceBag request;
     private final AttributeAggregate jobState;
+    private final boolean revocable;
+
+    private ResourceRequest(
+        ITaskConfig task,
+        ResourceBag request,
+        AttributeAggregate jobState,
+        boolean revocable) {
 
-    public ResourceRequest(ITaskConfig task, ResourceBag request, AttributeAggregate jobState) {
-      this.task = task;
-      this.request = request;
-      this.jobState = jobState;
+      this.task = requireNonNull(task);
+      this.request = requireNonNull(request);
+      this.jobState = requireNonNull(jobState);
+      this.revocable = revocable;
+    }
+
+    public static ResourceRequest fromTask(
+        ITaskConfig task,
+        ExecutorSettings executorSettings,
+        AttributeAggregate jobState,
+        TierManager tierManager) {
+
+      return new ResourceRequest(
+          task,
+          ResourceManager.bagFromTask(task, executorSettings),
+          jobState,
+          tierManager.getTier(task).isRevocable());
     }
 
     public Iterable<IConstraint> getConstraints() {
@@ -329,6 +354,10 @@ public interface SchedulingFilter {
       return jobState;
     }
 
+    public boolean isRevocable() {
+      return revocable;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (!(o instanceof ResourceRequest)) {
@@ -338,12 +367,13 @@ public interface SchedulingFilter {
       ResourceRequest other = (ResourceRequest) o;
       return Objects.equals(task, other.task)
           && Objects.equals(request, other.request)
-          && Objects.equals(jobState, other.jobState);
+          && Objects.equals(jobState, other.jobState)
+          && revocable == other.revocable;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(task, request, jobState);
+      return Objects.hash(task, request, jobState, revocable);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index cb288bb..bcb2bbf 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -152,12 +152,7 @@ public interface MesosTaskFactory {
 
       ITaskConfig config = task.getTask();
 
-      // Docker-based tasks don't need executors
-      ResourceBag executorOverhead = ResourceBag.EMPTY;
-      if (config.isSetExecutorConfig()) {
-        executorOverhead =
-            executorSettings.getExecutorOverhead(getExecutorName(task)).orElse(ResourceBag.EMPTY);
-      }
+      ResourceBag executorOverhead = executorSettings.getExecutorOverhead(config);
 
       AcceptedOffer acceptedOffer;
       // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
index a01c0a8..2ea7a01 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
@@ -154,22 +154,13 @@ class HostOffers {
         .toSet();
   }
 
-  synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                               ResourceRequest resourceRequest,
-                                               boolean revocable) {
+  synchronized Optional<HostOffer> getMatching(
+      Protos.AgentID slaveId,
+      ResourceRequest resourceRequest) {
 
-    Optional<HostOffer> optionalOffer = get(slaveId);
-    if (optionalOffer.isPresent()) {
-      HostOffer offer = optionalOffer.get();
-
-      if (isGloballyBanned(offer)
-          || isVetoed(offer, resourceRequest, revocable, Optional.empty())) {
-
-        return Optional.empty();
-      }
-    }
-
-    return optionalOffer;
+    return get(slaveId)
+        .filter(offer -> !isGloballyBanned(offer))
+        .filter(offer -> !isVetoed(offer, resourceRequest, Optional.empty()));
   }
 
   /**
@@ -182,14 +173,13 @@ class HostOffers {
    * @return The offers a given task group can use.
    */
   synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                                  ResourceRequest resourceRequest,
-                                                  boolean revocable) {
+                                                  ResourceRequest resourceRequest) {
 
     return Iterables.unmodifiableIterable(FluentIterable.from(offers)
         .filter(o -> !isGloballyBanned(o))
         .filter(o -> !isStaticallyBanned(o, groupKey))
         .filter(HostOffer::hasCpuAndMem)
-        .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey))));
+        .filter(o -> !isVetoed(o, resourceRequest, Optional.of(groupKey))));
   }
 
   private synchronized boolean isGloballyBanned(HostOffer offer) {
@@ -207,11 +197,10 @@ class HostOffers {
    */
   private boolean isVetoed(HostOffer offer,
                            ResourceRequest resourceRequest,
-                           boolean revocable,
                            Optional<TaskGroupKey> groupKey) {
 
     vetoEvaluatedOffers.incrementAndGet();
-    UnusedResource unusedResource = new UnusedResource(offer, revocable);
+    UnusedResource unusedResource = new UnusedResource(offer, resourceRequest.isRevocable());
     Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest);
     if (!vetoes.isEmpty()) {
       if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index e90de3e..8f9e33d 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -81,12 +81,9 @@ public interface OfferManager extends EventSubscriber {
    *
    * @param slaveId Slave ID to get the offer for.
    * @param resourceRequest The request that the offer should satisfy.
-   * @param revocable Whether or not the request can use revocable resources.
    * @return An option containing the offer for the slave ID if it fits.
    */
-  Optional<HostOffer> getMatching(AgentID slaveId,
-                                  ResourceRequest resourceRequest,
-                                  boolean revocable);
+  Optional<HostOffer> getMatching(AgentID slaveId, ResourceRequest resourceRequest);
 
   /**
    * Gets all offers that the scheduler is holding that satisfy the supplied
@@ -94,12 +91,9 @@ public interface OfferManager extends EventSubscriber {
    *
    * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}.
    * @param resourceRequest The request that the offer should satisfy.
-   * @param revocable Whether or not the request can use revocable resources.
    * @return An option containing the offer for the slave ID if it fits.
    */
-  Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                     ResourceRequest resourceRequest,
-                                     boolean revocable);
+  Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, ResourceRequest resourceRequest);
 
   /**
    * Launches the task matched against the offer.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
index 8e806b7..084b48c 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
@@ -162,19 +162,15 @@ public class OfferManagerImpl implements OfferManager {
   }
 
   @Override
-  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                         ResourceRequest resourceRequest,
-                                         boolean revocable) {
-
-    return hostOffers.getMatching(slaveId, resourceRequest, revocable);
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) {
+    return hostOffers.getMatching(slaveId, resourceRequest);
   }
 
   @Override
   public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                            ResourceRequest resourceRequest,
-                                            boolean revocable) {
+                                            ResourceRequest resourceRequest) {
 
-    return hostOffers.getAllMatching(groupKey, resourceRequest, revocable);
+    return hostOffers.getAllMatching(groupKey, resourceRequest);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
index 69b6866..780689e 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
@@ -17,6 +17,7 @@ import java.util.Objects;
 
 import com.google.common.base.MoreObjects;
 
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -54,8 +55,8 @@ public final class PreemptionVictim {
     return task.getTask().getPriority();
   }
 
-  public ResourceBag getResourceBag() {
-    return ResourceManager.bagFromResources(task.getTask().getResources());
+  public ResourceBag getResourceBag(ExecutorSettings executorSettings) {
+    return ResourceManager.bagFromTask(task.getTask(), executorSettings);
   }
 
   public String getTaskId() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index cf6d348..569cfe6 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -113,13 +112,7 @@ public interface PreemptionVictimFilter {
         new Function<PreemptionVictim, ResourceBag>() {
           @Override
           public ResourceBag apply(PreemptionVictim victim) {
-            ResourceBag bag = victim.getResourceBag();
-
-            if (victim.getConfig().isSetExecutorConfig()) {
-              // Be pessimistic about revocable resource available if config is not available
-              bag.add(executorSettings.getExecutorOverhead(
-                  victim.getConfig().getExecutorConfig().getName()).orElse(EMPTY));
-            }
+            ResourceBag bag = victim.getResourceBag(executorSettings);
 
             if (tierManager.getTier(victim.getConfig()).isRevocable()) {
               // Revocable task CPU cannot be used for preemption purposes as it's a compressible
@@ -223,10 +216,8 @@ public interface PreemptionVictimFilter {
         return Optional.empty();
       }
 
-      ResourceBag overhead = pendingTask.isSetExecutorConfig()
-          ? executorSettings.getExecutorOverhead(
-              pendingTask.getExecutorConfig().getName()).orElse(EMPTY)
-          : EMPTY;
+      ResourceRequest requiredResources =
+          ResourceRequest.fromTask(pendingTask, executorSettings, jobState, tierManager);
 
       ResourceBag totalResource = slackResources;
       for (PreemptionVictim victim : sortedVictims) {
@@ -240,10 +231,7 @@ public interface PreemptionVictimFilter {
 
         Set<Veto> vetoes = schedulingFilter.filter(
             new UnusedResource(totalResource, attributes.get(), unavailability),
-            new ResourceRequest(
-                pendingTask,
-                ResourceManager.bagFromResources(pendingTask.getResources()).add(overhead),
-                jobState));
+            requiredResources);
 
         if (vetoes.isEmpty()) {
           return Optional.of(ImmutableSet.copyOf(toPreemptTasks));

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index d093753..2bf9808 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -26,6 +26,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IResource;
@@ -245,6 +246,11 @@ public final class ResourceManager {
     return bagFromResources(resources, RESOURCE_TO_TYPE, QUANTIFY_RESOURCE);
   }
 
+  public static ResourceBag bagFromTask(ITaskConfig task, ExecutorSettings executorSettings) {
+    return bagFromResources(task.getResources(), RESOURCE_TO_TYPE, QUANTIFY_RESOURCE)
+        .add(executorSettings.getExecutorOverhead(task));
+  }
+
   /**
    * Creates a {@link ResourceBag} from Mesos resources.
    *

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
index 87619b5..d2597a1 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
@@ -41,6 +41,6 @@ public interface TaskAssigner {
       MutableStoreProvider storeProvider,
       ResourceRequest resourceRequest,
       TaskGroupKey groupKey,
-      Iterable<IAssignedTask> tasks,
+      Set<IAssignedTask> tasks,
       Map<String, TaskGroupKey> preemptionReservations);
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
index 54bd177..ec416cc 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -21,22 +22,22 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManager.LaunchException;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
@@ -64,7 +65,6 @@ public class TaskAssignerImpl implements TaskAssigner {
   private final StateManager stateManager;
   private final MesosTaskFactory taskFactory;
   private final OfferManager offerManager;
-  private final TierManager tierManager;
   private final UpdateAgentReserver updateAgentReserver;
   private final OfferSelector offerSelector;
 
@@ -73,7 +73,6 @@ public class TaskAssignerImpl implements TaskAssigner {
       StateManager stateManager,
       MesosTaskFactory taskFactory,
       OfferManager offerManager,
-      TierManager tierManager,
       UpdateAgentReserver updateAgentReserver,
       StatsProvider statsProvider,
       OfferSelector offerSelector) {
@@ -81,7 +80,6 @@ public class TaskAssignerImpl implements TaskAssigner {
     this.stateManager = requireNonNull(stateManager);
     this.taskFactory = requireNonNull(taskFactory);
     this.offerManager = requireNonNull(offerManager);
-    this.tierManager = requireNonNull(tierManager);
     this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
     this.updateAgentReserver = requireNonNull(updateAgentReserver);
     this.offerSelector = requireNonNull(offerSelector);
@@ -99,7 +97,7 @@ public class TaskAssignerImpl implements TaskAssigner {
   }
 
   private Protos.TaskInfo assign(
-      Storage.MutableStoreProvider storeProvider,
+      MutableStoreProvider storeProvider,
       Protos.Offer offer,
       String taskId,
       boolean revocable) {
@@ -118,20 +116,18 @@ public class TaskAssignerImpl implements TaskAssigner {
   }
 
   private void launchUsingOffer(
-      Storage.MutableStoreProvider storeProvider,
-      boolean revocable,
+      MutableStoreProvider stores,
       ResourceRequest resourceRequest,
       IAssignedTask task,
-      HostOffer offer,
-      ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
+      HostOffer offer) throws LaunchException {
 
     String taskId = task.getTaskId();
-    Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, revocable);
+    Protos.TaskInfo taskInfo =
+        assign(stores, offer.getOffer(), taskId, resourceRequest.isRevocable());
     resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
     try {
       offerManager.launchTask(offer.getOffer().getId(), taskInfo);
-      assignmentResult.add(taskId);
-    } catch (OfferManager.LaunchException e) {
+    } catch (LaunchException e) {
       LOG.warn("Failed to launch task.", e);
       launchFailures.incrementAndGet();
 
@@ -139,147 +135,144 @@ public class TaskAssignerImpl implements TaskAssigner {
       // It is in the LOST state and a new task will move to PENDING to replace it.
       // Should the state change fail due to storage issues, that's okay.  The task will
       // time out in the ASSIGNED state and be moved to LOST.
-      stateManager.changeState(
-          storeProvider,
-          taskId,
-          Optional.of(PENDING),
-          LOST,
-          LAUNCH_FAILED_MSG);
+      stateManager.changeState(stores, taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
       throw e;
     }
   }
 
-  private Iterable<IAssignedTask> maybeAssignReserved(
-      Iterable<IAssignedTask> tasks,
-      Storage.MutableStoreProvider storeProvider,
-      boolean revocable,
-      ResourceRequest resourceRequest,
-      TaskGroupKey groupKey,
-      ImmutableSet.Builder<String> assignmentResult) {
+  private static final class ReservationStatus {
+    final boolean taskReserving;
+    final Optional<HostOffer> offer;
 
-    if (!updateAgentReserver.hasReservations(groupKey)) {
-      return tasks;
+    private ReservationStatus(boolean taskReserving, Optional<HostOffer> offer) {
+      this.taskReserving = taskReserving;
+      this.offer = requireNonNull(offer);
     }
 
-    // Data structure to record which tasks should be excluded from the regular (non-reserved)
-    // scheduling loop. This is important because we release reservations once they are used,
-    // so we need to record them separately to avoid them being double-scheduled.
-    ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
-
-    for (IAssignedTask task : tasks) {
-      IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
-      Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
-      if (maybeAgentId.isPresent()) {
-        excludeBuilder.add(key);
-        Optional<HostOffer> offer = offerManager.getMatching(
-            Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(),
-            resourceRequest,
-            revocable);
-        if (offer.isPresent()) {
-          try {
-            // The offer can still be veto'd because of changed constraints, or because the
-            // Scheduler hasn't been updated by Mesos yet...
-            launchUsingOffer(storeProvider,
-                revocable,
-                resourceRequest,
-                task,
-                offer.get(),
-                assignmentResult);
-            LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
-            updateAgentReserver.release(maybeAgentId.get(), key);
-          } catch (OfferManager.LaunchException e) {
-            updateAgentReserver.release(maybeAgentId.get(), key);
-          }
-        } else {
-          LOG.info(
-              "Tried to reuse offer on {} for {}, but was not ready yet.",
-              maybeAgentId.get(),
-              key);
-        }
-      }
+    static final ReservationStatus NOT_RESERVING = new ReservationStatus(false, Optional.empty());
+    static final ReservationStatus NOT_READY = new ReservationStatus(true, Optional.empty());
+
+    static ReservationStatus ready(HostOffer offer) {
+      return new ReservationStatus(true, Optional.of(offer));
     }
 
-    // Return only the tasks that didn't have reservations. Offers on agents that were reserved
-    // might not have been seen by Aurora yet, so we need to wait until the reservation expires
-    // before giving up and falling back to the first-fit algorithm.
-    Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
-    return Iterables.filter(tasks, t -> !toBeExcluded.contains(
-        InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
+    boolean isTaskReserving() {
+      return taskReserving;
+    }
+
+    Optional<HostOffer> getOffer() {
+      return offer;
+    }
+  }
+
+  private ReservationStatus getReservation(IAssignedTask task, ResourceRequest resourceRequest) {
+
+    IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
+    Optional<String> agentId = updateAgentReserver.getAgent(key);
+    if (!agentId.isPresent()) {
+      return ReservationStatus.NOT_RESERVING;
+    }
+    Optional<HostOffer> offer = offerManager.getMatching(
+        Protos.AgentID.newBuilder().setValue(agentId.get()).build(),
+        resourceRequest);
+    if (offer.isPresent()) {
+      LOG.info("Used update reservation for {} on {}", key, agentId.get());
+      updateAgentReserver.release(agentId.get(), key);
+      return ReservationStatus.ready(offer.get());
+    } else {
+      LOG.info(
+          "Tried to reuse offer on {} for {}, but was not ready yet.",
+          agentId.get(),
+          key);
+      return ReservationStatus.NOT_READY;
+    }
   }
 
   /**
    * Determine whether or not the offer is reserved for a different task via preemption or
    * update affinity.
    */
-  @SuppressWarnings("PMD.UselessParentheses")  // TODO(jly): PMD bug, remove when upgrade from 5.5.3
-  private boolean isAgentReserved(HostOffer offer,
-                                  TaskGroupKey groupKey,
-                                  Map<String, TaskGroupKey> preemptionReservations) {
+  private boolean isAgentReserved(
+      HostOffer offer,
+      TaskGroupKey groupKey,
+      Map<String, TaskGroupKey> preemptionReservations) {
 
     String agentId = offer.getOffer().getAgentId().getValue();
-    Optional<TaskGroupKey> reservedGroup = Optional.ofNullable(
-        preemptionReservations.get(agentId));
+    boolean reservedForPreemption = Optional.ofNullable(preemptionReservations.get(agentId))
+        .map(group -> !group.equals(groupKey))
+        .orElse(false);
 
-    return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey))
-        || !updateAgentReserver.getReservations(agentId).isEmpty();
+    return reservedForPreemption || updateAgentReserver.isReserved(agentId);
   }
 
-  @Timed("assigner_maybe_assign")
-  @Override
-  public Set<String> maybeAssign(
-      Storage.MutableStoreProvider storeProvider,
-      ResourceRequest resourceRequest,
-      TaskGroupKey groupKey,
-      Iterable<IAssignedTask> tasks,
-      Map<String, TaskGroupKey> preemptionReservations) {
+  private static class SchedulingMatch {
+    final IAssignedTask task;
+    final HostOffer offer;
 
-    if (Iterables.isEmpty(tasks)) {
-      return ImmutableSet.of();
+    SchedulingMatch(IAssignedTask task, HostOffer offer) {
+      this.task = requireNonNull(task);
+      this.offer = requireNonNull(offer);
     }
+  }
 
-    boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
-    ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+  private Collection<SchedulingMatch> findMatches(
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Set<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> preemptionReservations) {
 
-    // Assign tasks reserved for a specific agent (e.g. for update affinity)
-    Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
-        tasks,
-        storeProvider,
-        revocable,
-        resourceRequest,
-        groupKey,
-        assignmentResult);
+    // Avoid matching multiple tasks against any offer.
+    Map<String, SchedulingMatch> matchesByOffer = Maps.newHashMap();
 
-    // Assign the rest of the non-reserved tasks
-    for (IAssignedTask task : nonReservedTasks) {
-      try {
+    tasks.forEach(task -> {
+      ReservationStatus reservation = getReservation(task, resourceRequest);
+      Optional<HostOffer> chosenOffer;
+      if (reservation.isTaskReserving()) {
+        // Use the reserved offer, which may not currently exist.
+        chosenOffer = reservation.getOffer();
+      } else {
         // Get all offers that will satisfy the given ResourceRequest and that are not reserved
         // for updates or preemption
-        FluentIterable<HostOffer> matchingOffers = FluentIterable
-            .from(offerManager.getAllMatching(groupKey, resourceRequest, revocable))
-            .filter(o -> !isAgentReserved(o, groupKey, preemptionReservations));
+        Iterable<HostOffer> matchingOffers = Iterables.filter(
+            offerManager.getAllMatching(groupKey, resourceRequest),
+            o -> !matchesByOffer.containsKey(o.getOffer().getId().getValue())
+                && !isAgentReserved(o, groupKey, preemptionReservations));
 
         // Determine which is the optimal offer to select for the given request
-        Optional<HostOffer> optionalOffer = offerSelector.select(matchingOffers, resourceRequest);
-
-        // If no offer is chosen, continue to the next task
-        if (!optionalOffer.isPresent()) {
-          continue;
-        }
-
-        // Attempt to launch the task using the chosen offer
-        HostOffer offer = optionalOffer.get();
-        launchUsingOffer(storeProvider,
-            revocable,
-            resourceRequest,
-            task,
-            offer,
-            assignmentResult);
-      } catch (OfferManager.LaunchException e) {
+        chosenOffer = offerSelector.select(matchingOffers, resourceRequest);
+      }
+
+      chosenOffer.ifPresent(hostOffer -> {
+        matchesByOffer.put(
+            hostOffer.getOffer().getId().getValue(),
+            new SchedulingMatch(task, hostOffer));
+      });
+    });
+
+    return matchesByOffer.values();
+  }
+
+  @Timed("assigner_maybe_assign")
+  @Override
+  public Set<String> maybeAssign(
+      MutableStoreProvider storeProvider,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Set<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> reservations) {
+
+    ImmutableSet.Builder<String> assigned = ImmutableSet.builder();
+
+    for (SchedulingMatch match : findMatches(resourceRequest, groupKey, tasks, reservations)) {
+      try {
+        launchUsingOffer(storeProvider, resourceRequest, match.task, match.offer);
+        assigned.add(match.task.getTaskId());
+      } catch (LaunchException e) {
         // Any launch exception causes the scheduling round to terminate for this TaskGroup.
         break;
       }
     }
 
-    return assignmentResult.build();
+    return assigned.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 3c38f95..d2f3257 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -31,5 +31,5 @@ public interface TaskScheduler extends EventSubscriber {
    * @return Successfully scheduled task IDs. The caller should call schedule again if a given
    *         task ID was not present in the result.
    */
-  Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds);
+  Set<String> schedule(MutableStoreProvider storeProvider, Set<String> taskIds);
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
index cff4ab1..edab03d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
@@ -19,6 +19,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import javax.inject.Inject;
@@ -26,7 +27,6 @@ import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -34,16 +34,17 @@ import com.google.common.eventbus.Subscribe;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -55,10 +56,8 @@ import static java.lang.annotation.ElementType.METHOD;
 import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toMap;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
 
 /**
  * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
@@ -81,6 +80,7 @@ public class TaskSchedulerImpl implements TaskScheduler {
   private final TaskAssigner assigner;
   private final Preemptor preemptor;
   private final ExecutorSettings executorSettings;
+  private final TierManager tierManager;
   private final BiCache<String, TaskGroupKey> reservations;
 
   private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
@@ -92,16 +92,18 @@ public class TaskSchedulerImpl implements TaskScheduler {
       TaskAssigner assigner,
       Preemptor preemptor,
       ExecutorSettings executorSettings,
+      TierManager tierManager,
       BiCache<String, TaskGroupKey> reservations) {
 
     this.assigner = requireNonNull(assigner);
     this.preemptor = requireNonNull(preemptor);
     this.executorSettings = requireNonNull(executorSettings);
+    this.tierManager = requireNonNull(tierManager);
     this.reservations = requireNonNull(reservations);
   }
 
   @Timed("task_schedule_attempt")
-  public Set<String> schedule(Storage.MutableStoreProvider store, Iterable<String> taskIds) {
+  public Set<String> schedule(MutableStoreProvider store, Set<String> taskIds) {
     try {
       return scheduleTasks(store, taskIds);
     } catch (RuntimeException e) {
@@ -115,77 +117,67 @@ public class TaskSchedulerImpl implements TaskScheduler {
     }
   }
 
-  private Set<String> scheduleTasks(Storage.MutableStoreProvider store, Iterable<String> tasks) {
-    ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
-    String taskIdValues = Joiner.on(",").join(taskIds);
-    LOG.debug("Attempting to schedule tasks {}", taskIdValues);
-    ImmutableSet<IAssignedTask> assignedTasks =
-        ImmutableSet.copyOf(Iterables.transform(
-            store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
-            IScheduledTask::getAssignedTask));
-
-    if (Iterables.isEmpty(assignedTasks)) {
-      LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues);
-      return taskIds;
+  private Map<String, IAssignedTask> fetchTasks(StoreProvider store, Set<String> ids) {
+    Map<String, IAssignedTask> tasks = store.getTaskStore()
+        .fetchTasks(Query.taskScoped(ids).byStatus(PENDING))
+        .stream()
+        .map(IScheduledTask::getAssignedTask)
+        .collect(Collectors.toMap(
+            IAssignedTask::getTaskId,
+            Function.identity()
+        ));
+
+    if (ids.size() != tasks.size()) {
+      LOG.warn("Failed to look up tasks "
+          + Joiner.on(", ").join(Sets.difference(ids, tasks.keySet())));
     }
+    return tasks;
+  }
 
-    Preconditions.checkState(
-        assignedTasks.stream()
-            .collect(Collectors.groupingBy(t -> t.getTask()))
-            .entrySet()
-            .size() == 1,
-        "Found multiple task groups for %s",
-        taskIdValues);
-
-    Map<String, IAssignedTask> assignableTaskMap =
-        assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+  private Set<String> scheduleTasks(MutableStoreProvider store, Set<String> ids) {
+    LOG.debug("Attempting to schedule tasks {}", ids);
+    Map<String, IAssignedTask> tasksById = fetchTasks(store, ids);
 
-    if (taskIds.size() != assignedTasks.size()) {
-      LOG.warn("Failed to look up tasks "
-          + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
+    if (tasksById.isEmpty()) {
+      // None of the tasks were found in storage.  This could be caused by a task group that was
+      // killed by the user, for example.
+      return ids;
     }
 
-    // This is safe after all checks above.
-    ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+    // Prepare scheduling context for the tasks
+    ITaskConfig task = Iterables.getOnlyElement(tasksById.values().stream()
+        .map(IAssignedTask::getTask)
+        .collect(Collectors.toSet()));
     AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
 
-    // Valid Docker tasks can have a container but no executor config
-    ResourceBag overhead = ResourceBag.EMPTY;
-    if (task.isSetExecutorConfig()) {
-      overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
-          .orElseThrow(
-              () -> new IllegalArgumentException("Cannot find executor configuration"));
-    }
-
+    // Attempt to schedule using available resources.
     Set<String> launched = assigner.maybeAssign(
         store,
-        new SchedulingFilter.ResourceRequest(
-            task,
-            bagFromResources(task.getResources()).add(overhead), aggregate),
+        ResourceRequest.fromTask(task, executorSettings, aggregate, tierManager),
         TaskGroupKey.from(task),
-        assignedTasks,
+        ImmutableSet.copyOf(tasksById.values()),
         reservations.asMap());
 
-    attemptsFired.addAndGet(assignableTaskMap.size());
-    Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
+    attemptsFired.addAndGet(tasksById.size());
 
-    failedToLaunch.forEach(taskId -> {
-      // Task could not be scheduled.
+    // Fall back to preemption for tasks not scheduled above.
+    Set<String> unassigned = Sets.difference(tasksById.keySet(), launched);
+    unassigned.forEach(taskId -> {
       // TODO(maxim): Now that preemption slots are searched asynchronously, consider
       // retrying a launch attempt within the current scheduling round IFF a reservation is
       // available.
-      maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+      maybePreemptFor(tasksById.get(taskId), aggregate, store);
     });
-    attemptsNoMatch.addAndGet(failedToLaunch.size());
+    attemptsNoMatch.addAndGet(unassigned.size());
 
     // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
-    return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
+    return Sets.union(launched, Sets.difference(ids, tasksById.keySet()));
   }
 
   private void maybePreemptFor(
       IAssignedTask task,
       AttributeAggregate jobState,
-      Storage.MutableStoreProvider storeProvider) {
+      MutableStoreProvider storeProvider) {
 
     if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
       return;

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 1219215..ebb345d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage;
 
+import java.util.Collection;
 import java.util.Optional;
 import java.util.Set;
 
@@ -47,7 +48,7 @@ public interface TaskStore {
    * @param query Builder of the query to identify tasks with.
    * @return A read-only view of matching tasks.
    */
-  Iterable<IScheduledTask> fetchTasks(Query.Builder query);
+  Collection<IScheduledTask> fetchTasks(Query.Builder query);
 
   /**
    * Fetches all job keys represented in the task store.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
index dea8e69..8d70cae 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage.durability;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -327,7 +328,7 @@ public class WriteRecorder implements
   }
 
   @Override
-  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+  public Collection<IScheduledTask> fetchTasks(Query.Builder query) {
     return this.taskStore.fetchTasks(query);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
index b6909a6..59eca7b 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
@@ -14,12 +14,9 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Optional;
-import java.util.Set;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
 
-import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.slf4j.Logger;
@@ -58,21 +55,13 @@ public interface UpdateAgentReserver {
   Optional<String> getAgent(IInstanceKey key);
 
   /**
-   * Get all reservations for a given agent id. Useful for skipping over the agent between the
+   * Checks whether an agent is currently reserved. Useful for skipping over the agent between the
    * reserve/release window.
    *
    * @param agentId The agent id to look up reservations for.
    * @return A set of keys reserved for that agent.
    */
-  Set<IInstanceKey> getReservations(String agentId);
-
-  /**
-   * Check if the agent reserver has any reservations for the provided key.
-   *
-   * @param groupKey The key to check.
-   * @return True if there are reservations against any instances in that key.
-   */
-  boolean hasReservations(TaskGroupKey groupKey);
+  boolean isReserved(String agentId);
 
   /**
    * Implementation of the update reserver backed by a BiCache (the same mechanism we use for
@@ -99,16 +88,9 @@ public interface UpdateAgentReserver {
       cache.remove(key, agentId);
     }
 
-    public Set<IInstanceKey> getReservations(String agentId) {
-      return cache.getByValue(agentId);
-    }
-
     @Override
-    public boolean hasReservations(TaskGroupKey groupKey) {
-      return cache.asMap().entrySet().stream()
-          .filter(entry -> entry.getKey().getJobKey().equals(groupKey.getTask().getJob()))
-          .findFirst()
-          .isPresent();
+    public boolean isReserved(String agentId) {
+      return !cache.getByValue(agentId).isEmpty();
     }
 
     @Override
@@ -137,12 +119,7 @@ public interface UpdateAgentReserver {
     }
 
     @Override
-    public Set<IInstanceKey> getReservations(String agentId) {
-      return ImmutableSet.of();
-    }
-
-    @Override
-    public boolean hasReservations(TaskGroupKey groupKey) {
+    public boolean isReserved(String agentId) {
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
index 82e40d5..f116e3a 100644
--- a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
@@ -48,6 +48,13 @@ public class TierManagerTest {
   }
 
   @Test
+  public void testDefaultTier() {
+    assertEquals(
+        DEV_TIER,
+        TIER_MANAGER.getTier(ITaskConfig.build(new TaskConfig())));
+  }
+
+  @Test
   public void testGetTierRevocableAndProduction() {
     assertEquals(
         REVOCABLE_TIER,

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 64d7a44..7136711 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -23,13 +23,12 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.TaskVars;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.metadata.NearestFit;
-import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -53,8 +52,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private static final UnusedResource RESOURCE = new UnusedResource(
       ResourceManager.bagFromResources(TASK.getResources()),
       IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
-  private static final ResourceRequest REQUEST =
-      new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.empty());
+  private static final ResourceRequest REQUEST = TaskTestUtil.toResourceRequest(TASK);
 
   private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
   private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 0de90d7..21d4e47 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -38,14 +38,13 @@ import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
-import org.apache.aurora.scheduler.mesos.TaskExecutors;
 import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAttribute;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -57,8 +56,10 @@ import org.junit.Test;
 import static org.apache.aurora.gen.Resource.diskMb;
 import static org.apache.aurora.gen.Resource.numCpus;
 import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
 import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
@@ -135,22 +136,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(noPortTask, bag(noPortTask), empty())));
+            TaskTestUtil.toResourceRequest(noPortTask)));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(onePortTask, bag(onePortTask), empty())));
+            TaskTestUtil.toResourceRequest(onePortTask)));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(twoPortTask, bag(twoPortTask), empty())));
+            TaskTestUtil.toResourceRequest(twoPortTask)));
     assertEquals(
         ImmutableSet.of(veto(PORTS, 1)),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(threePortTask, bag(threePortTask), empty())));
+            TaskTestUtil.toResourceRequest(threePortTask)));
   }
 
   @Test
@@ -238,13 +239,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         DEFAULT_OFFER,
         hostAttributes(HOST_A),
         Optional.of(start));
-    ResourceRequest request = new ResourceRequest(task, bag(task), empty());
 
     control.replay();
 
     assertEquals(
         ImmutableSet.of(Veto.maintenance("draining")),
-        defaultFilter.filter(unusedResource, request));
+        defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task)));
   }
 
   @Test
@@ -262,14 +262,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         DEFAULT_OFFER,
         hostAttributes(HOST_A),
         Optional.of(start));
-    ResourceRequest request = new ResourceRequest(task, bag(task), empty());
 
     control.replay();
 
     assertEquals(
         ImmutableSet.of(),
-        defaultFilter.filter(unusedResource, request));
-
+        defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task)));
   }
 
   @Test
@@ -350,7 +348,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testLimitWithinJob() throws Exception {
+  public void testLimitWithinJob() {
     control.replay();
 
     AttributeAggregate stateA = AttributeAggregate.create(
@@ -465,7 +463,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         ImmutableSet.of(),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(task, bag(task), empty())));
+            TaskTestUtil.toResourceRequest(task)));
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -577,7 +575,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         expected,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(task, bag(task), aggregate))
+            TaskTestUtil.toResourceRequest(task))
             .isEmpty());
 
     Constraint negated = constraint.deepCopy();
@@ -587,7 +585,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         !expected,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(negatedTask, bag(negatedTask), aggregate))
+            ResourceRequest.fromTask(negatedTask, NO_OVERHEAD_EXECUTOR, aggregate, TIER_MANAGER))
             .isEmpty());
     return task;
   }
@@ -618,7 +616,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         ImmutableSet.copyOf(vetoes),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(task, bag(task), jobState)));
+            ResourceRequest.fromTask(task, NO_OVERHEAD_EXECUTOR, jobState, TIER_MANAGER)));
   }
 
   private static IHostAttributes hostAttributes(
@@ -686,10 +684,4 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private ITaskConfig makeTask() {
     return makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK);
   }
-
-  private ResourceBag bag(ITaskConfig task) {
-    return ResourceManager.bagFromResources(task.getResources())
-        .add(TaskExecutors.NO_OVERHEAD_EXECUTOR.getExecutorOverhead(
-            task.getExecutorConfig().getName()).get());
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index c27a662..686087e 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -38,6 +38,7 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -124,15 +125,13 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
       .setAgentId(SLAVE)
       .setHostname("slave-hostname")
       .addAllResources(mesosScalarFromBag(bagFromResources(
-              TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(
-                  TASK_CONFIG.getExecutorConfig().getName()).get())))
+              TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(TASK_CONFIG))))
       .addResources(mesosRange(PORTS, 80))
       .build();
   private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder()
       .clearResources()
       .addAllResources(mesosScalarFromBag(bagFromResources(
-          TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(
-              TASK_CONFIG.getExecutorConfig().getName()).get())))
+          TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(TASK_CONFIG))))
       .addResources(mesosRange(PORTS, 80))
       .build();
 
@@ -282,10 +281,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
     ResourceBag executorResources =
         bagFromMesosResources(taskInfo.getExecutor().getResourcesList());
 
-    assertEquals(
-        bagFromResources(task.getResources()).add(
-            config.getExecutorOverhead(task.getExecutorConfig().getName()).get()),
-        taskResources.add(executorResources));
+    assertEquals(ResourceManager.bagFromTask(task, config), taskResources.add(executorResources));
   }
 
   private void checkDiscoveryInfoUnset(TaskInfo taskInfo) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 1e9532b..28224a5 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -29,6 +29,7 @@ import org.apache.aurora.common.util.testing.FakeTicker;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
@@ -37,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.offers.Deferment.Noop;
-import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -57,7 +57,6 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES;
@@ -100,10 +99,8 @@ public class OfferManagerImplTest extends EasyMockTest {
   private static final int PORT = 1000;
   private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
   private static final IScheduledTask TASK = makeTask("id", JOB);
-  private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
-      TASK.getAssignedTask().getTask(),
-      ResourceBag.EMPTY,
-      empty());
+  private static final ResourceRequest EMPTY_REQUEST =
+      TaskTestUtil.toResourceRequest(TASK.getAssignedTask().getTask());
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
@@ -250,14 +247,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.add(OFFER_A);
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
 
     // Add static ban.
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   @Test
@@ -269,14 +266,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban expires after maximum amount of time an offer is held.
     FAKE_TICKER.advance(RETURN_DELAY);
     offerManager.cleanupStaticBans();
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
@@ -289,7 +286,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban is cleared when driver is disconnected.
@@ -297,7 +294,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.add(OFFER_A);
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   @Test
@@ -361,14 +358,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
 
     offerManager.cancel(OFFER_A_ID);
     offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   private static HostOffer setUnavailability(HostOffer offer, long startMs) {
@@ -423,7 +420,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -460,7 +457,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, true)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -488,7 +485,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -516,7 +513,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -555,7 +552,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -628,7 +625,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     control.replay();
     offerManager.add(OFFER_A);
     assertEquals(Optional.of(OFFER_A),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
   }
 
   @Test
@@ -640,7 +637,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     offerManager.ban(OFFER_A_ID);
     assertEquals(Optional.empty(),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
   }
 
@@ -655,7 +652,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(Optional.empty(),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
@@ -669,7 +666,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_C);
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
   }
 
@@ -685,7 +682,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     offerManager.ban(OFFER_B.getOffer().getId());
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
   }
@@ -702,7 +699,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY);
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), GROUP_KEY)),
@@ -727,7 +724,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_A),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(empty, OFFER_A),
         ImmutableSet.copyOf(offerManager.getAll()));
@@ -750,7 +747,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_B),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), GROUP_KEY)),