You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2018/01/09 22:51:10 UTC

[2/2] aurora git commit: Refactor scheduling code to split matching and assigning phases

Refactor scheduling code to split matching and assigning phases

This patch sets the stage for performing the bulk of scheduling work in a
separate call path, without holding the write lock.

Also included is a mechanical refactor pushing the `revocable` flag into
`ResourceRequest` (which was ~always needed as a sibling parameter).

Reviewed at https://reviews.apache.org/r/64954/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4e6242fe
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4e6242fe
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4e6242fe

Branch: refs/heads/master
Commit: 4e6242fed68650e7be906ec3d17ae750f49f8cb8
Parents: 5b34231
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 9 14:50:51 2018 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 9 14:50:51 2018 -0800

----------------------------------------------------------------------
 .../common/testing/easymock/EasyMockTest.java   |  14 +-
 .../benchmark/fakes/FakeOfferManager.java       |  12 +-
 .../apache/aurora/scheduler/TierManager.java    |   4 +-
 .../aurora/scheduler/base/TaskTestUtil.java     |  14 +-
 .../executor/ExecutorSettings.java              |  21 +-
 .../scheduler/filter/SchedulingFilter.java      |  42 +++-
 .../scheduler/mesos/MesosTaskFactory.java       |   7 +-
 .../aurora/scheduler/offers/HostOffers.java     |  29 +--
 .../aurora/scheduler/offers/OfferManager.java   |  10 +-
 .../scheduler/offers/OfferManagerImpl.java      |  12 +-
 .../scheduler/preemptor/PreemptionVictim.java   |   5 +-
 .../preemptor/PreemptionVictimFilter.java       |  20 +-
 .../scheduler/resources/ResourceManager.java    |   6 +
 .../scheduler/scheduling/TaskAssigner.java      |   2 +-
 .../scheduler/scheduling/TaskAssignerImpl.java  | 237 +++++++++----------
 .../scheduler/scheduling/TaskScheduler.java     |   2 +-
 .../scheduler/scheduling/TaskSchedulerImpl.java | 100 ++++----
 .../aurora/scheduler/storage/TaskStore.java     |   3 +-
 .../storage/durability/WriteRecorder.java       |   3 +-
 .../scheduler/updater/UpdateAgentReserver.java  |  33 +--
 .../aurora/scheduler/TierManagerTest.java       |   7 +
 .../events/NotifyingSchedulingFilterTest.java   |   6 +-
 .../filter/SchedulingFilterImplTest.java        |  36 ++-
 .../mesos/MesosTaskFactoryImplTest.java         |  12 +-
 .../scheduler/offers/OfferManagerImplTest.java  |  51 ++--
 .../preemptor/PreemptionVictimFilterTest.java   | 101 ++------
 .../scheduling/FirstFitOfferSelectorTest.java   |   9 +-
 .../scheduling/TaskAssignerImplTest.java        | 170 ++++++-------
 .../scheduling/TaskSchedulerImplTest.java       |  56 ++---
 .../updater/NullAgentReserverTest.java          |   3 +-
 .../updater/UpdateAgentReserverImplTest.java    |  45 +---
 31 files changed, 476 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
index c5500b3..15fc677 100644
--- a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
+++ b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
@@ -26,6 +26,9 @@ import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 
 import static org.easymock.EasyMock.createControl;
 
@@ -38,6 +41,16 @@ import static org.easymock.EasyMock.createControl;
 public abstract class EasyMockTest extends TearDownTestCase {
   protected IMocksControl control;
 
+  @Rule
+  public final TestWatcher verifyControl = new TestWatcher() {
+    @Override
+    protected void succeeded(Description description) {
+      // Only attempt to verify the control when the test case otherwise succeeded.  This prevents
+      // spurious mock-related error messages that distract from the real error.
+      control.verify();
+    }
+  };
+
   /**
    * Creates an EasyMock {@link #control} for tests to use that will be automatically
    * {@link IMocksControl#verify() verified} on tear down.
@@ -45,7 +58,6 @@ public abstract class EasyMockTest extends TearDownTestCase {
   @Before
   public final void setupEasyMock() {
     control = createControl();
-    addTearDown(() -> control.verify());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index f0dacd4..0a105c7 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -15,6 +15,8 @@ package org.apache.aurora.benchmark.fakes;
 
 import java.util.Optional;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -59,18 +61,14 @@ public class FakeOfferManager implements OfferManager {
   }
 
   @Override
-  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                         ResourceRequest resourceRequest,
-                                         boolean revocable) {
-
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) {
     return Optional.empty();
   }
 
   @Override
   public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                            ResourceRequest resourceRequest,
-                                            boolean revocable) {
+                                            ResourceRequest resourceRequest) {
 
-    return null;
+    return ImmutableList.of();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/TierManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TierManager.java b/src/main/java/org/apache/aurora/scheduler/TierManager.java
index c6ad2b1..a37fea4 100644
--- a/src/main/java/org/apache/aurora/scheduler/TierManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/TierManager.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler;
 
 import java.util.Map;
+import java.util.Optional;
 
 import javax.inject.Inject;
 
@@ -102,7 +103,8 @@ public interface TierManager {
           !taskConfig.isSetTier() || tierConfig.tiers.containsKey(taskConfig.getTier()),
           "Invalid tier '%s' in TaskConfig.", taskConfig.getTier());
 
-      return tierConfig.tiers.get(taskConfig.getTier());
+      return tierConfig.tiers.get(
+          Optional.ofNullable(taskConfig.getTier()).orElse(tierConfig.defaultTier));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 22d5a64..2b61c27 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -47,6 +47,8 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -70,12 +72,14 @@ public final class TaskTestUtil {
       new TierInfo(true /* preemptible */, false /* revocable */);
   public static final TierInfo PREFERRED_TIER =
       new TierInfo(false /* preemptible */, false /* revocable */);
+  public static final String REVOCABLE_TIER_NAME = "tier-revocable";
   public static final String PROD_TIER_NAME = "tier-prod";
   public static final String DEV_TIER_NAME = "tier-dev";
   public static final TierConfig TIER_CONFIG =
       new TierConfig(DEV_TIER_NAME, ImmutableMap.of(
           PROD_TIER_NAME, PREFERRED_TIER,
-          DEV_TIER_NAME, DEV_TIER
+          DEV_TIER_NAME, DEV_TIER,
+          REVOCABLE_TIER_NAME, REVOCABLE_TIER
       ));
   public static final TierManager TIER_MANAGER = new TierManager.TierManagerImpl(TIER_CONFIG);
   public static final ThriftBackfill THRIFT_BACKFILL = new ThriftBackfill(TIER_MANAGER);
@@ -237,4 +241,12 @@ public final class TaskTestUtil {
         new org.apache.aurora.gen.TierConfig("revocable", REVOCABLE_TIER.toMap())
     );
   }
+
+  public static ResourceRequest toResourceRequest(ITaskConfig task) {
+    return ResourceRequest.fromTask(
+        task,
+        EXECUTOR_SETTINGS,
+        AttributeAggregate.empty(),
+        TIER_MANAGER);
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
index 5c987fd..dac84e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
@@ -19,6 +19,9 @@ import java.util.Optional;
 
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -26,6 +29,9 @@ import static java.util.Objects.requireNonNull;
  * Configuration for the executor to run, and resource overhead required for it.
  */
 public class ExecutorSettings {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutorSettings.class);
+
   private final Map<String, ExecutorConfig> config;
   private final boolean populateDiscoveryInfo;
 
@@ -45,12 +51,19 @@ public class ExecutorSettings {
     return populateDiscoveryInfo;
   }
 
-  public Optional<ResourceBag> getExecutorOverhead(String name) {
+  public ResourceBag getExecutorOverhead(ITaskConfig task) {
+    if (!task.isSetExecutorConfig()) {
+      // Docker-based tasks don't need executors
+      return ResourceBag.EMPTY;
+    }
+
+    String name = task.getExecutorConfig().getName();
     if (config.containsKey(name)) {
-      return Optional.of(
-          ResourceManager.bagFromMesosResources(config.get(name).getExecutor().getResourcesList()));
+      return ResourceManager.bagFromMesosResources(
+          config.get(name).getExecutor().getResourcesList());
     } else {
-      return Optional.empty();
+      LOG.warn("No executor configuration found for " + name);
+      return ResourceBag.EMPTY;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index bd41590..fd97259 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -21,12 +21,17 @@ import java.util.Set;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
+import static java.util.Objects.requireNonNull;
+
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.CONSTRAINT_MISMATCH;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.INSUFFICIENT_RESOURCES;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.LIMIT_NOT_SATISFIED;
@@ -306,11 +311,31 @@ public interface SchedulingFilter {
     private final ITaskConfig task;
     private final ResourceBag request;
     private final AttributeAggregate jobState;
+    private final boolean revocable;
+
+    private ResourceRequest(
+        ITaskConfig task,
+        ResourceBag request,
+        AttributeAggregate jobState,
+        boolean revocable) {
 
-    public ResourceRequest(ITaskConfig task, ResourceBag request, AttributeAggregate jobState) {
-      this.task = task;
-      this.request = request;
-      this.jobState = jobState;
+      this.task = requireNonNull(task);
+      this.request = requireNonNull(request);
+      this.jobState = requireNonNull(jobState);
+      this.revocable = revocable;
+    }
+
+    public static ResourceRequest fromTask(
+        ITaskConfig task,
+        ExecutorSettings executorSettings,
+        AttributeAggregate jobState,
+        TierManager tierManager) {
+
+      return new ResourceRequest(
+          task,
+          ResourceManager.bagFromTask(task, executorSettings),
+          jobState,
+          tierManager.getTier(task).isRevocable());
     }
 
     public Iterable<IConstraint> getConstraints() {
@@ -329,6 +354,10 @@ public interface SchedulingFilter {
       return jobState;
     }
 
+    public boolean isRevocable() {
+      return revocable;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (!(o instanceof ResourceRequest)) {
@@ -338,12 +367,13 @@ public interface SchedulingFilter {
       ResourceRequest other = (ResourceRequest) o;
       return Objects.equals(task, other.task)
           && Objects.equals(request, other.request)
-          && Objects.equals(jobState, other.jobState);
+          && Objects.equals(jobState, other.jobState)
+          && revocable == other.revocable;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(task, request, jobState);
+      return Objects.hash(task, request, jobState, revocable);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index cb288bb..bcb2bbf 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -152,12 +152,7 @@ public interface MesosTaskFactory {
 
       ITaskConfig config = task.getTask();
 
-      // Docker-based tasks don't need executors
-      ResourceBag executorOverhead = ResourceBag.EMPTY;
-      if (config.isSetExecutorConfig()) {
-        executorOverhead =
-            executorSettings.getExecutorOverhead(getExecutorName(task)).orElse(ResourceBag.EMPTY);
-      }
+      ResourceBag executorOverhead = executorSettings.getExecutorOverhead(config);
 
       AcceptedOffer acceptedOffer;
       // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
index a01c0a8..2ea7a01 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
@@ -154,22 +154,13 @@ class HostOffers {
         .toSet();
   }
 
-  synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                               ResourceRequest resourceRequest,
-                                               boolean revocable) {
+  synchronized Optional<HostOffer> getMatching(
+      Protos.AgentID slaveId,
+      ResourceRequest resourceRequest) {
 
-    Optional<HostOffer> optionalOffer = get(slaveId);
-    if (optionalOffer.isPresent()) {
-      HostOffer offer = optionalOffer.get();
-
-      if (isGloballyBanned(offer)
-          || isVetoed(offer, resourceRequest, revocable, Optional.empty())) {
-
-        return Optional.empty();
-      }
-    }
-
-    return optionalOffer;
+    return get(slaveId)
+        .filter(offer -> !isGloballyBanned(offer))
+        .filter(offer -> !isVetoed(offer, resourceRequest, Optional.empty()));
   }
 
   /**
@@ -182,14 +173,13 @@ class HostOffers {
    * @return The offers a given task group can use.
    */
   synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                                  ResourceRequest resourceRequest,
-                                                  boolean revocable) {
+                                                  ResourceRequest resourceRequest) {
 
     return Iterables.unmodifiableIterable(FluentIterable.from(offers)
         .filter(o -> !isGloballyBanned(o))
         .filter(o -> !isStaticallyBanned(o, groupKey))
         .filter(HostOffer::hasCpuAndMem)
-        .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey))));
+        .filter(o -> !isVetoed(o, resourceRequest, Optional.of(groupKey))));
   }
 
   private synchronized boolean isGloballyBanned(HostOffer offer) {
@@ -207,11 +197,10 @@ class HostOffers {
    */
   private boolean isVetoed(HostOffer offer,
                            ResourceRequest resourceRequest,
-                           boolean revocable,
                            Optional<TaskGroupKey> groupKey) {
 
     vetoEvaluatedOffers.incrementAndGet();
-    UnusedResource unusedResource = new UnusedResource(offer, revocable);
+    UnusedResource unusedResource = new UnusedResource(offer, resourceRequest.isRevocable());
     Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest);
     if (!vetoes.isEmpty()) {
       if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index e90de3e..8f9e33d 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -81,12 +81,9 @@ public interface OfferManager extends EventSubscriber {
    *
    * @param slaveId Slave ID to get the offer for.
    * @param resourceRequest The request that the offer should satisfy.
-   * @param revocable Whether or not the request can use revocable resources.
    * @return An option containing the offer for the slave ID if it fits.
    */
-  Optional<HostOffer> getMatching(AgentID slaveId,
-                                  ResourceRequest resourceRequest,
-                                  boolean revocable);
+  Optional<HostOffer> getMatching(AgentID slaveId, ResourceRequest resourceRequest);
 
   /**
    * Gets all offers that the scheduler is holding that satisfy the supplied
@@ -94,12 +91,9 @@ public interface OfferManager extends EventSubscriber {
    *
    * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}.
    * @param resourceRequest The request that the offer should satisfy.
-   * @param revocable Whether or not the request can use revocable resources.
    * @return An option containing the offer for the slave ID if it fits.
    */
-  Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                     ResourceRequest resourceRequest,
-                                     boolean revocable);
+  Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, ResourceRequest resourceRequest);
 
   /**
    * Launches the task matched against the offer.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
index 8e806b7..084b48c 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
@@ -162,19 +162,15 @@ public class OfferManagerImpl implements OfferManager {
   }
 
   @Override
-  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                         ResourceRequest resourceRequest,
-                                         boolean revocable) {
-
-    return hostOffers.getMatching(slaveId, resourceRequest, revocable);
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) {
+    return hostOffers.getMatching(slaveId, resourceRequest);
   }
 
   @Override
   public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                            ResourceRequest resourceRequest,
-                                            boolean revocable) {
+                                            ResourceRequest resourceRequest) {
 
-    return hostOffers.getAllMatching(groupKey, resourceRequest, revocable);
+    return hostOffers.getAllMatching(groupKey, resourceRequest);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
index 69b6866..780689e 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
@@ -17,6 +17,7 @@ import java.util.Objects;
 
 import com.google.common.base.MoreObjects;
 
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -54,8 +55,8 @@ public final class PreemptionVictim {
     return task.getTask().getPriority();
   }
 
-  public ResourceBag getResourceBag() {
-    return ResourceManager.bagFromResources(task.getTask().getResources());
+  public ResourceBag getResourceBag(ExecutorSettings executorSettings) {
+    return ResourceManager.bagFromTask(task.getTask(), executorSettings);
   }
 
   public String getTaskId() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index cf6d348..569cfe6 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -113,13 +112,7 @@ public interface PreemptionVictimFilter {
         new Function<PreemptionVictim, ResourceBag>() {
           @Override
           public ResourceBag apply(PreemptionVictim victim) {
-            ResourceBag bag = victim.getResourceBag();
-
-            if (victim.getConfig().isSetExecutorConfig()) {
-              // Be pessimistic about revocable resource available if config is not available
-              bag.add(executorSettings.getExecutorOverhead(
-                  victim.getConfig().getExecutorConfig().getName()).orElse(EMPTY));
-            }
+            ResourceBag bag = victim.getResourceBag(executorSettings);
 
             if (tierManager.getTier(victim.getConfig()).isRevocable()) {
               // Revocable task CPU cannot be used for preemption purposes as it's a compressible
@@ -223,10 +216,8 @@ public interface PreemptionVictimFilter {
         return Optional.empty();
       }
 
-      ResourceBag overhead = pendingTask.isSetExecutorConfig()
-          ? executorSettings.getExecutorOverhead(
-              pendingTask.getExecutorConfig().getName()).orElse(EMPTY)
-          : EMPTY;
+      ResourceRequest requiredResources =
+          ResourceRequest.fromTask(pendingTask, executorSettings, jobState, tierManager);
 
       ResourceBag totalResource = slackResources;
       for (PreemptionVictim victim : sortedVictims) {
@@ -240,10 +231,7 @@ public interface PreemptionVictimFilter {
 
         Set<Veto> vetoes = schedulingFilter.filter(
             new UnusedResource(totalResource, attributes.get(), unavailability),
-            new ResourceRequest(
-                pendingTask,
-                ResourceManager.bagFromResources(pendingTask.getResources()).add(overhead),
-                jobState));
+            requiredResources);
 
         if (vetoes.isEmpty()) {
           return Optional.of(ImmutableSet.copyOf(toPreemptTasks));

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index d093753..2bf9808 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -26,6 +26,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IResource;
@@ -245,6 +246,11 @@ public final class ResourceManager {
     return bagFromResources(resources, RESOURCE_TO_TYPE, QUANTIFY_RESOURCE);
   }
 
+  public static ResourceBag bagFromTask(ITaskConfig task, ExecutorSettings executorSettings) {
+    return bagFromResources(task.getResources(), RESOURCE_TO_TYPE, QUANTIFY_RESOURCE)
+        .add(executorSettings.getExecutorOverhead(task));
+  }
+
   /**
    * Creates a {@link ResourceBag} from Mesos resources.
    *

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
index 87619b5..d2597a1 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
@@ -41,6 +41,6 @@ public interface TaskAssigner {
       MutableStoreProvider storeProvider,
       ResourceRequest resourceRequest,
       TaskGroupKey groupKey,
-      Iterable<IAssignedTask> tasks,
+      Set<IAssignedTask> tasks,
       Map<String, TaskGroupKey> preemptionReservations);
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
index 54bd177..ec416cc 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -21,22 +22,22 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManager.LaunchException;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
@@ -64,7 +65,6 @@ public class TaskAssignerImpl implements TaskAssigner {
   private final StateManager stateManager;
   private final MesosTaskFactory taskFactory;
   private final OfferManager offerManager;
-  private final TierManager tierManager;
   private final UpdateAgentReserver updateAgentReserver;
   private final OfferSelector offerSelector;
 
@@ -73,7 +73,6 @@ public class TaskAssignerImpl implements TaskAssigner {
       StateManager stateManager,
       MesosTaskFactory taskFactory,
       OfferManager offerManager,
-      TierManager tierManager,
       UpdateAgentReserver updateAgentReserver,
       StatsProvider statsProvider,
       OfferSelector offerSelector) {
@@ -81,7 +80,6 @@ public class TaskAssignerImpl implements TaskAssigner {
     this.stateManager = requireNonNull(stateManager);
     this.taskFactory = requireNonNull(taskFactory);
     this.offerManager = requireNonNull(offerManager);
-    this.tierManager = requireNonNull(tierManager);
     this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
     this.updateAgentReserver = requireNonNull(updateAgentReserver);
     this.offerSelector = requireNonNull(offerSelector);
@@ -99,7 +97,7 @@ public class TaskAssignerImpl implements TaskAssigner {
   }
 
   private Protos.TaskInfo assign(
-      Storage.MutableStoreProvider storeProvider,
+      MutableStoreProvider storeProvider,
       Protos.Offer offer,
       String taskId,
       boolean revocable) {
@@ -118,20 +116,18 @@ public class TaskAssignerImpl implements TaskAssigner {
   }
 
   private void launchUsingOffer(
-      Storage.MutableStoreProvider storeProvider,
-      boolean revocable,
+      MutableStoreProvider stores,
       ResourceRequest resourceRequest,
       IAssignedTask task,
-      HostOffer offer,
-      ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
+      HostOffer offer) throws LaunchException {
 
     String taskId = task.getTaskId();
-    Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, revocable);
+    Protos.TaskInfo taskInfo =
+        assign(stores, offer.getOffer(), taskId, resourceRequest.isRevocable());
     resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
     try {
       offerManager.launchTask(offer.getOffer().getId(), taskInfo);
-      assignmentResult.add(taskId);
-    } catch (OfferManager.LaunchException e) {
+    } catch (LaunchException e) {
       LOG.warn("Failed to launch task.", e);
       launchFailures.incrementAndGet();
 
@@ -139,147 +135,144 @@ public class TaskAssignerImpl implements TaskAssigner {
       // It is in the LOST state and a new task will move to PENDING to replace it.
       // Should the state change fail due to storage issues, that's okay.  The task will
       // time out in the ASSIGNED state and be moved to LOST.
-      stateManager.changeState(
-          storeProvider,
-          taskId,
-          Optional.of(PENDING),
-          LOST,
-          LAUNCH_FAILED_MSG);
+      stateManager.changeState(stores, taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
       throw e;
     }
   }
 
-  private Iterable<IAssignedTask> maybeAssignReserved(
-      Iterable<IAssignedTask> tasks,
-      Storage.MutableStoreProvider storeProvider,
-      boolean revocable,
-      ResourceRequest resourceRequest,
-      TaskGroupKey groupKey,
-      ImmutableSet.Builder<String> assignmentResult) {
+  private static final class ReservationStatus {
+    final boolean taskReserving;
+    final Optional<HostOffer> offer;
 
-    if (!updateAgentReserver.hasReservations(groupKey)) {
-      return tasks;
+    private ReservationStatus(boolean taskReserving, Optional<HostOffer> offer) {
+      this.taskReserving = taskReserving;
+      this.offer = requireNonNull(offer);
     }
 
-    // Data structure to record which tasks should be excluded from the regular (non-reserved)
-    // scheduling loop. This is important because we release reservations once they are used,
-    // so we need to record them separately to avoid them being double-scheduled.
-    ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
-
-    for (IAssignedTask task : tasks) {
-      IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
-      Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
-      if (maybeAgentId.isPresent()) {
-        excludeBuilder.add(key);
-        Optional<HostOffer> offer = offerManager.getMatching(
-            Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(),
-            resourceRequest,
-            revocable);
-        if (offer.isPresent()) {
-          try {
-            // The offer can still be veto'd because of changed constraints, or because the
-            // Scheduler hasn't been updated by Mesos yet...
-            launchUsingOffer(storeProvider,
-                revocable,
-                resourceRequest,
-                task,
-                offer.get(),
-                assignmentResult);
-            LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
-            updateAgentReserver.release(maybeAgentId.get(), key);
-          } catch (OfferManager.LaunchException e) {
-            updateAgentReserver.release(maybeAgentId.get(), key);
-          }
-        } else {
-          LOG.info(
-              "Tried to reuse offer on {} for {}, but was not ready yet.",
-              maybeAgentId.get(),
-              key);
-        }
-      }
+    static final ReservationStatus NOT_RESERVING = new ReservationStatus(false, Optional.empty());
+    static final ReservationStatus NOT_READY = new ReservationStatus(true, Optional.empty());
+
+    static ReservationStatus ready(HostOffer offer) {
+      return new ReservationStatus(true, Optional.of(offer));
     }
 
-    // Return only the tasks that didn't have reservations. Offers on agents that were reserved
-    // might not have been seen by Aurora yet, so we need to wait until the reservation expires
-    // before giving up and falling back to the first-fit algorithm.
-    Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
-    return Iterables.filter(tasks, t -> !toBeExcluded.contains(
-        InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
+    boolean isTaskReserving() {
+      return taskReserving;
+    }
+
+    Optional<HostOffer> getOffer() {
+      return offer;
+    }
+  }
+
+  private ReservationStatus getReservation(IAssignedTask task, ResourceRequest resourceRequest) {
+
+    IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
+    Optional<String> agentId = updateAgentReserver.getAgent(key);
+    if (!agentId.isPresent()) {
+      return ReservationStatus.NOT_RESERVING;
+    }
+    Optional<HostOffer> offer = offerManager.getMatching(
+        Protos.AgentID.newBuilder().setValue(agentId.get()).build(),
+        resourceRequest);
+    if (offer.isPresent()) {
+      LOG.info("Used update reservation for {} on {}", key, agentId.get());
+      updateAgentReserver.release(agentId.get(), key);
+      return ReservationStatus.ready(offer.get());
+    } else {
+      LOG.info(
+          "Tried to reuse offer on {} for {}, but was not ready yet.",
+          agentId.get(),
+          key);
+      return ReservationStatus.NOT_READY;
+    }
   }
 
   /**
    * Determine whether or not the offer is reserved for a different task via preemption or
    * update affinity.
    */
-  @SuppressWarnings("PMD.UselessParentheses")  // TODO(jly): PMD bug, remove when upgrade from 5.5.3
-  private boolean isAgentReserved(HostOffer offer,
-                                  TaskGroupKey groupKey,
-                                  Map<String, TaskGroupKey> preemptionReservations) {
+  private boolean isAgentReserved(
+      HostOffer offer,
+      TaskGroupKey groupKey,
+      Map<String, TaskGroupKey> preemptionReservations) {
 
     String agentId = offer.getOffer().getAgentId().getValue();
-    Optional<TaskGroupKey> reservedGroup = Optional.ofNullable(
-        preemptionReservations.get(agentId));
+    boolean reservedForPreemption = Optional.ofNullable(preemptionReservations.get(agentId))
+        .map(group -> !group.equals(groupKey))
+        .orElse(false);
 
-    return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey))
-        || !updateAgentReserver.getReservations(agentId).isEmpty();
+    return reservedForPreemption || updateAgentReserver.isReserved(agentId);
   }
 
-  @Timed("assigner_maybe_assign")
-  @Override
-  public Set<String> maybeAssign(
-      Storage.MutableStoreProvider storeProvider,
-      ResourceRequest resourceRequest,
-      TaskGroupKey groupKey,
-      Iterable<IAssignedTask> tasks,
-      Map<String, TaskGroupKey> preemptionReservations) {
+  private static class SchedulingMatch {
+    final IAssignedTask task;
+    final HostOffer offer;
 
-    if (Iterables.isEmpty(tasks)) {
-      return ImmutableSet.of();
+    SchedulingMatch(IAssignedTask task, HostOffer offer) {
+      this.task = requireNonNull(task);
+      this.offer = requireNonNull(offer);
     }
+  }
 
-    boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
-    ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+  private Collection<SchedulingMatch> findMatches(
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Set<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> preemptionReservations) {
 
-    // Assign tasks reserved for a specific agent (e.g. for update affinity)
-    Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
-        tasks,
-        storeProvider,
-        revocable,
-        resourceRequest,
-        groupKey,
-        assignmentResult);
+    // Avoid matching multiple tasks against any offer.
+    Map<String, SchedulingMatch> matchesByOffer = Maps.newHashMap();
 
-    // Assign the rest of the non-reserved tasks
-    for (IAssignedTask task : nonReservedTasks) {
-      try {
+    tasks.forEach(task -> {
+      ReservationStatus reservation = getReservation(task, resourceRequest);
+      Optional<HostOffer> chosenOffer;
+      if (reservation.isTaskReserving()) {
+        // Use the reserved offer, which may not currently exist.
+        chosenOffer = reservation.getOffer();
+      } else {
         // Get all offers that will satisfy the given ResourceRequest and that are not reserved
         // for updates or preemption
-        FluentIterable<HostOffer> matchingOffers = FluentIterable
-            .from(offerManager.getAllMatching(groupKey, resourceRequest, revocable))
-            .filter(o -> !isAgentReserved(o, groupKey, preemptionReservations));
+        Iterable<HostOffer> matchingOffers = Iterables.filter(
+            offerManager.getAllMatching(groupKey, resourceRequest),
+            o -> !matchesByOffer.containsKey(o.getOffer().getId().getValue())
+                && !isAgentReserved(o, groupKey, preemptionReservations));
 
         // Determine which is the optimal offer to select for the given request
-        Optional<HostOffer> optionalOffer = offerSelector.select(matchingOffers, resourceRequest);
-
-        // If no offer is chosen, continue to the next task
-        if (!optionalOffer.isPresent()) {
-          continue;
-        }
-
-        // Attempt to launch the task using the chosen offer
-        HostOffer offer = optionalOffer.get();
-        launchUsingOffer(storeProvider,
-            revocable,
-            resourceRequest,
-            task,
-            offer,
-            assignmentResult);
-      } catch (OfferManager.LaunchException e) {
+        chosenOffer = offerSelector.select(matchingOffers, resourceRequest);
+      }
+
+      chosenOffer.ifPresent(hostOffer -> {
+        matchesByOffer.put(
+            hostOffer.getOffer().getId().getValue(),
+            new SchedulingMatch(task, hostOffer));
+      });
+    });
+
+    return matchesByOffer.values();
+  }
+
+  @Timed("assigner_maybe_assign")
+  @Override
+  public Set<String> maybeAssign(
+      MutableStoreProvider storeProvider,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Set<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> reservations) {
+
+    ImmutableSet.Builder<String> assigned = ImmutableSet.builder();
+
+    for (SchedulingMatch match : findMatches(resourceRequest, groupKey, tasks, reservations)) {
+      try {
+        launchUsingOffer(storeProvider, resourceRequest, match.task, match.offer);
+        assigned.add(match.task.getTaskId());
+      } catch (LaunchException e) {
         // Any launch exception causes the scheduling round to terminate for this TaskGroup.
         break;
       }
     }
 
-    return assignmentResult.build();
+    return assigned.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 3c38f95..d2f3257 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -31,5 +31,5 @@ public interface TaskScheduler extends EventSubscriber {
    * @return Successfully scheduled task IDs. The caller should call schedule again if a given
    *         task ID was not present in the result.
    */
-  Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds);
+  Set<String> schedule(MutableStoreProvider storeProvider, Set<String> taskIds);
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
index cff4ab1..edab03d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
@@ -19,6 +19,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import javax.inject.Inject;
@@ -26,7 +27,6 @@ import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -34,16 +34,17 @@ import com.google.common.eventbus.Subscribe;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -55,10 +56,8 @@ import static java.lang.annotation.ElementType.METHOD;
 import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toMap;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
 
 /**
  * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
@@ -81,6 +80,7 @@ public class TaskSchedulerImpl implements TaskScheduler {
   private final TaskAssigner assigner;
   private final Preemptor preemptor;
   private final ExecutorSettings executorSettings;
+  private final TierManager tierManager;
   private final BiCache<String, TaskGroupKey> reservations;
 
   private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
@@ -92,16 +92,18 @@ public class TaskSchedulerImpl implements TaskScheduler {
       TaskAssigner assigner,
       Preemptor preemptor,
       ExecutorSettings executorSettings,
+      TierManager tierManager,
       BiCache<String, TaskGroupKey> reservations) {
 
     this.assigner = requireNonNull(assigner);
     this.preemptor = requireNonNull(preemptor);
     this.executorSettings = requireNonNull(executorSettings);
+    this.tierManager = requireNonNull(tierManager);
     this.reservations = requireNonNull(reservations);
   }
 
   @Timed("task_schedule_attempt")
-  public Set<String> schedule(Storage.MutableStoreProvider store, Iterable<String> taskIds) {
+  public Set<String> schedule(MutableStoreProvider store, Set<String> taskIds) {
     try {
       return scheduleTasks(store, taskIds);
     } catch (RuntimeException e) {
@@ -115,77 +117,67 @@ public class TaskSchedulerImpl implements TaskScheduler {
     }
   }
 
-  private Set<String> scheduleTasks(Storage.MutableStoreProvider store, Iterable<String> tasks) {
-    ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
-    String taskIdValues = Joiner.on(",").join(taskIds);
-    LOG.debug("Attempting to schedule tasks {}", taskIdValues);
-    ImmutableSet<IAssignedTask> assignedTasks =
-        ImmutableSet.copyOf(Iterables.transform(
-            store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
-            IScheduledTask::getAssignedTask));
-
-    if (Iterables.isEmpty(assignedTasks)) {
-      LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues);
-      return taskIds;
+  private Map<String, IAssignedTask> fetchTasks(StoreProvider store, Set<String> ids) {
+    Map<String, IAssignedTask> tasks = store.getTaskStore()
+        .fetchTasks(Query.taskScoped(ids).byStatus(PENDING))
+        .stream()
+        .map(IScheduledTask::getAssignedTask)
+        .collect(Collectors.toMap(
+            IAssignedTask::getTaskId,
+            Function.identity()
+        ));
+
+    if (ids.size() != tasks.size()) {
+      LOG.warn("Failed to look up tasks "
+          + Joiner.on(", ").join(Sets.difference(ids, tasks.keySet())));
     }
+    return tasks;
+  }
 
-    Preconditions.checkState(
-        assignedTasks.stream()
-            .collect(Collectors.groupingBy(t -> t.getTask()))
-            .entrySet()
-            .size() == 1,
-        "Found multiple task groups for %s",
-        taskIdValues);
-
-    Map<String, IAssignedTask> assignableTaskMap =
-        assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+  private Set<String> scheduleTasks(MutableStoreProvider store, Set<String> ids) {
+    LOG.debug("Attempting to schedule tasks {}", ids);
+    Map<String, IAssignedTask> tasksById = fetchTasks(store, ids);
 
-    if (taskIds.size() != assignedTasks.size()) {
-      LOG.warn("Failed to look up tasks "
-          + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
+    if (tasksById.isEmpty()) {
+      // None of the tasks were found in storage.  This could be caused by a task group that was
+      // killed by the user, for example.
+      return ids;
     }
 
-    // This is safe after all checks above.
-    ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+    // Prepare scheduling context for the tasks
+    ITaskConfig task = Iterables.getOnlyElement(tasksById.values().stream()
+        .map(IAssignedTask::getTask)
+        .collect(Collectors.toSet()));
     AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
 
-    // Valid Docker tasks can have a container but no executor config
-    ResourceBag overhead = ResourceBag.EMPTY;
-    if (task.isSetExecutorConfig()) {
-      overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
-          .orElseThrow(
-              () -> new IllegalArgumentException("Cannot find executor configuration"));
-    }
-
+    // Attempt to schedule using available resources.
     Set<String> launched = assigner.maybeAssign(
         store,
-        new SchedulingFilter.ResourceRequest(
-            task,
-            bagFromResources(task.getResources()).add(overhead), aggregate),
+        ResourceRequest.fromTask(task, executorSettings, aggregate, tierManager),
         TaskGroupKey.from(task),
-        assignedTasks,
+        ImmutableSet.copyOf(tasksById.values()),
         reservations.asMap());
 
-    attemptsFired.addAndGet(assignableTaskMap.size());
-    Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
+    attemptsFired.addAndGet(tasksById.size());
 
-    failedToLaunch.forEach(taskId -> {
-      // Task could not be scheduled.
+    // Fall back to preemption for tasks not scheduled above.
+    Set<String> unassigned = Sets.difference(tasksById.keySet(), launched);
+    unassigned.forEach(taskId -> {
       // TODO(maxim): Now that preemption slots are searched asynchronously, consider
       // retrying a launch attempt within the current scheduling round IFF a reservation is
       // available.
-      maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+      maybePreemptFor(tasksById.get(taskId), aggregate, store);
     });
-    attemptsNoMatch.addAndGet(failedToLaunch.size());
+    attemptsNoMatch.addAndGet(unassigned.size());
 
     // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
-    return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
+    return Sets.union(launched, Sets.difference(ids, tasksById.keySet()));
   }
 
   private void maybePreemptFor(
       IAssignedTask task,
       AttributeAggregate jobState,
-      Storage.MutableStoreProvider storeProvider) {
+      MutableStoreProvider storeProvider) {
 
     if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
       return;

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 1219215..ebb345d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage;
 
+import java.util.Collection;
 import java.util.Optional;
 import java.util.Set;
 
@@ -47,7 +48,7 @@ public interface TaskStore {
    * @param query Builder of the query to identify tasks with.
    * @return A read-only view of matching tasks.
    */
-  Iterable<IScheduledTask> fetchTasks(Query.Builder query);
+  Collection<IScheduledTask> fetchTasks(Query.Builder query);
 
   /**
    * Fetches all job keys represented in the task store.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
index dea8e69..8d70cae 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage.durability;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -327,7 +328,7 @@ public class WriteRecorder implements
   }
 
   @Override
-  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+  public Collection<IScheduledTask> fetchTasks(Query.Builder query) {
     return this.taskStore.fetchTasks(query);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
index b6909a6..59eca7b 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
@@ -14,12 +14,9 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Optional;
-import java.util.Set;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
 
-import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.slf4j.Logger;
@@ -58,21 +55,13 @@ public interface UpdateAgentReserver {
   Optional<String> getAgent(IInstanceKey key);
 
   /**
-   * Get all reservations for a given agent id. Useful for skipping over the agent between the
+   * Checks whether an agent is currently reserved. Useful for skipping over the agent between the
    * reserve/release window.
    *
    * @param agentId The agent id to look up reservations for.
    * @return A set of keys reserved for that agent.
    */
-  Set<IInstanceKey> getReservations(String agentId);
-
-  /**
-   * Check if the agent reserver has any reservations for the provided key.
-   *
-   * @param groupKey The key to check.
-   * @return True if there are reservations against any instances in that key.
-   */
-  boolean hasReservations(TaskGroupKey groupKey);
+  boolean isReserved(String agentId);
 
   /**
    * Implementation of the update reserver backed by a BiCache (the same mechanism we use for
@@ -99,16 +88,9 @@ public interface UpdateAgentReserver {
       cache.remove(key, agentId);
     }
 
-    public Set<IInstanceKey> getReservations(String agentId) {
-      return cache.getByValue(agentId);
-    }
-
     @Override
-    public boolean hasReservations(TaskGroupKey groupKey) {
-      return cache.asMap().entrySet().stream()
-          .filter(entry -> entry.getKey().getJobKey().equals(groupKey.getTask().getJob()))
-          .findFirst()
-          .isPresent();
+    public boolean isReserved(String agentId) {
+      return !cache.getByValue(agentId).isEmpty();
     }
 
     @Override
@@ -137,12 +119,7 @@ public interface UpdateAgentReserver {
     }
 
     @Override
-    public Set<IInstanceKey> getReservations(String agentId) {
-      return ImmutableSet.of();
-    }
-
-    @Override
-    public boolean hasReservations(TaskGroupKey groupKey) {
+    public boolean isReserved(String agentId) {
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
index 82e40d5..f116e3a 100644
--- a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
@@ -48,6 +48,13 @@ public class TierManagerTest {
   }
 
   @Test
+  public void testDefaultTier() {
+    assertEquals(
+        DEV_TIER,
+        TIER_MANAGER.getTier(ITaskConfig.build(new TaskConfig())));
+  }
+
+  @Test
   public void testGetTierRevocableAndProduction() {
     assertEquals(
         REVOCABLE_TIER,

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 64d7a44..7136711 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -23,13 +23,12 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.TaskVars;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.metadata.NearestFit;
-import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -53,8 +52,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private static final UnusedResource RESOURCE = new UnusedResource(
       ResourceManager.bagFromResources(TASK.getResources()),
       IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
-  private static final ResourceRequest REQUEST =
-      new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.empty());
+  private static final ResourceRequest REQUEST = TaskTestUtil.toResourceRequest(TASK);
 
   private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
   private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 0de90d7..21d4e47 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -38,14 +38,13 @@ import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
-import org.apache.aurora.scheduler.mesos.TaskExecutors;
 import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAttribute;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -57,8 +56,10 @@ import org.junit.Test;
 import static org.apache.aurora.gen.Resource.diskMb;
 import static org.apache.aurora.gen.Resource.numCpus;
 import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
 import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
@@ -135,22 +136,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(noPortTask, bag(noPortTask), empty())));
+            TaskTestUtil.toResourceRequest(noPortTask)));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(onePortTask, bag(onePortTask), empty())));
+            TaskTestUtil.toResourceRequest(onePortTask)));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(twoPortTask, bag(twoPortTask), empty())));
+            TaskTestUtil.toResourceRequest(twoPortTask)));
     assertEquals(
         ImmutableSet.of(veto(PORTS, 1)),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(threePortTask, bag(threePortTask), empty())));
+            TaskTestUtil.toResourceRequest(threePortTask)));
   }
 
   @Test
@@ -238,13 +239,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         DEFAULT_OFFER,
         hostAttributes(HOST_A),
         Optional.of(start));
-    ResourceRequest request = new ResourceRequest(task, bag(task), empty());
 
     control.replay();
 
     assertEquals(
         ImmutableSet.of(Veto.maintenance("draining")),
-        defaultFilter.filter(unusedResource, request));
+        defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task)));
   }
 
   @Test
@@ -262,14 +262,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         DEFAULT_OFFER,
         hostAttributes(HOST_A),
         Optional.of(start));
-    ResourceRequest request = new ResourceRequest(task, bag(task), empty());
 
     control.replay();
 
     assertEquals(
         ImmutableSet.of(),
-        defaultFilter.filter(unusedResource, request));
-
+        defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task)));
   }
 
   @Test
@@ -350,7 +348,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testLimitWithinJob() throws Exception {
+  public void testLimitWithinJob() {
     control.replay();
 
     AttributeAggregate stateA = AttributeAggregate.create(
@@ -465,7 +463,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         ImmutableSet.of(),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(task, bag(task), empty())));
+            TaskTestUtil.toResourceRequest(task)));
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -577,7 +575,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         expected,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(task, bag(task), aggregate))
+            TaskTestUtil.toResourceRequest(task))
             .isEmpty());
 
     Constraint negated = constraint.deepCopy();
@@ -587,7 +585,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         !expected,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(negatedTask, bag(negatedTask), aggregate))
+            ResourceRequest.fromTask(negatedTask, NO_OVERHEAD_EXECUTOR, aggregate, TIER_MANAGER))
             .isEmpty());
     return task;
   }
@@ -618,7 +616,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         ImmutableSet.copyOf(vetoes),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(task, bag(task), jobState)));
+            ResourceRequest.fromTask(task, NO_OVERHEAD_EXECUTOR, jobState, TIER_MANAGER)));
   }
 
   private static IHostAttributes hostAttributes(
@@ -686,10 +684,4 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private ITaskConfig makeTask() {
     return makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK);
   }
-
-  private ResourceBag bag(ITaskConfig task) {
-    return ResourceManager.bagFromResources(task.getResources())
-        .add(TaskExecutors.NO_OVERHEAD_EXECUTOR.getExecutorOverhead(
-            task.getExecutorConfig().getName()).get());
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index c27a662..686087e 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -38,6 +38,7 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -124,15 +125,13 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
       .setAgentId(SLAVE)
       .setHostname("slave-hostname")
       .addAllResources(mesosScalarFromBag(bagFromResources(
-              TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(
-                  TASK_CONFIG.getExecutorConfig().getName()).get())))
+              TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(TASK_CONFIG))))
       .addResources(mesosRange(PORTS, 80))
       .build();
   private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder()
       .clearResources()
       .addAllResources(mesosScalarFromBag(bagFromResources(
-          TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(
-              TASK_CONFIG.getExecutorConfig().getName()).get())))
+          TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(TASK_CONFIG))))
       .addResources(mesosRange(PORTS, 80))
       .build();
 
@@ -282,10 +281,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
     ResourceBag executorResources =
         bagFromMesosResources(taskInfo.getExecutor().getResourcesList());
 
-    assertEquals(
-        bagFromResources(task.getResources()).add(
-            config.getExecutorOverhead(task.getExecutorConfig().getName()).get()),
-        taskResources.add(executorResources));
+    assertEquals(ResourceManager.bagFromTask(task, config), taskResources.add(executorResources));
   }
 
   private void checkDiscoveryInfoUnset(TaskInfo taskInfo) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 1e9532b..28224a5 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -29,6 +29,7 @@ import org.apache.aurora.common.util.testing.FakeTicker;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
@@ -37,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.offers.Deferment.Noop;
-import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -57,7 +57,6 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES;
@@ -100,10 +99,8 @@ public class OfferManagerImplTest extends EasyMockTest {
   private static final int PORT = 1000;
   private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
   private static final IScheduledTask TASK = makeTask("id", JOB);
-  private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
-      TASK.getAssignedTask().getTask(),
-      ResourceBag.EMPTY,
-      empty());
+  private static final ResourceRequest EMPTY_REQUEST =
+      TaskTestUtil.toResourceRequest(TASK.getAssignedTask().getTask());
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
@@ -250,14 +247,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.add(OFFER_A);
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
 
     // Add static ban.
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   @Test
@@ -269,14 +266,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban expires after maximum amount of time an offer is held.
     FAKE_TICKER.advance(RETURN_DELAY);
     offerManager.cleanupStaticBans();
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
@@ -289,7 +286,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban is cleared when driver is disconnected.
@@ -297,7 +294,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.add(OFFER_A);
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   @Test
@@ -361,14 +358,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
 
     offerManager.cancel(OFFER_A_ID);
     offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   private static HostOffer setUnavailability(HostOffer offer, long startMs) {
@@ -423,7 +420,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -460,7 +457,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, true)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -488,7 +485,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -516,7 +513,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -555,7 +552,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -628,7 +625,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     control.replay();
     offerManager.add(OFFER_A);
     assertEquals(Optional.of(OFFER_A),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
   }
 
   @Test
@@ -640,7 +637,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     offerManager.ban(OFFER_A_ID);
     assertEquals(Optional.empty(),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
   }
 
@@ -655,7 +652,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(Optional.empty(),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
@@ -669,7 +666,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_C);
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
   }
 
@@ -685,7 +682,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     offerManager.ban(OFFER_B.getOffer().getId());
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
   }
@@ -702,7 +699,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY);
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), GROUP_KEY)),
@@ -727,7 +724,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_A),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(empty, OFFER_A),
         ImmutableSet.copyOf(offerManager.getAll()));
@@ -750,7 +747,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_B),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), GROUP_KEY)),