You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/08/14 21:48:12 UTC

aurora git commit: Resources: reversing Resource/ResourceSlot relationship

Repository: aurora
Updated Branches:
  refs/heads/master 76d5a49ab -> 3676ec25b


Resources: reversing Resource/ResourceSlot relationship

Bugs closed: AURORA-1415

Reviewed at https://reviews.apache.org/r/37153/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3676ec25
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3676ec25
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3676ec25

Branch: refs/heads/master
Commit: 3676ec25b28c348d3ba4949e439f6d5c9f0dddb1
Parents: 76d5a49
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Aug 14 12:16:04 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Aug 14 12:16:04 2015 -0700

----------------------------------------------------------------------
 .../org/apache/aurora/benchmark/Offers.java     |   4 +-
 .../apache/aurora/scheduler/ResourceSlot.java   | 281 ++++++++++++++-----
 .../apache/aurora/scheduler/ResourceType.java   |  54 ++++
 .../org/apache/aurora/scheduler/Resources.java  | 188 ++-----------
 .../aurora/scheduler/app/SchedulerMain.java     |   4 +-
 .../scheduler/filter/SchedulingFilterImpl.java  |   4 +-
 .../scheduler/mesos/ExecutorSettings.java       |  14 +-
 .../scheduler/mesos/MesosTaskFactory.java       |  12 +-
 .../scheduler/preemptor/PreemptionVictim.java   |  20 +-
 .../preemptor/PreemptionVictimFilter.java       |  15 +-
 .../aurora/scheduler/state/TaskAssigner.java    |   3 +-
 .../scheduler/stats/AsyncStatsModule.java       |   3 +-
 .../aurora/scheduler/ResourceSlotTest.java      | 130 +++++++--
 .../apache/aurora/scheduler/ResourcesTest.java  | 106 +------
 .../aurora/scheduler/app/SchedulerIT.java       |   4 +-
 .../local/simulator/ClusterSimulatorModule.java |  13 +-
 .../events/NotifyingSchedulingFilterTest.java   |   2 +-
 .../filter/SchedulingFilterImplTest.java        |  12 +-
 .../mesos/MesosTaskFactoryImplTest.java         |  22 +-
 .../apache/aurora/scheduler/mesos/Offers.java   |  14 +-
 .../aurora/scheduler/mesos/TaskExecutors.java   |   4 +-
 .../preemptor/PreemptionVictimFilterTest.java   |   4 +-
 .../scheduler/state/TaskAssignerImplTest.java   |  35 +--
 23 files changed, 487 insertions(+), 461 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/jmh/java/org/apache/aurora/benchmark/Offers.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
index 269c196..9f3ce16 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
@@ -20,7 +20,7 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos;
@@ -88,7 +88,7 @@ final class Offers {
       int id = 0;
       for (IHostAttributes attributes : hostAttributes) {
         Protos.Offer offer = Protos.Offer.newBuilder()
-            .addAllResources(new Resources(cpu, ram, disk, ports).toResourceList())
+            .addAllResources(new ResourceSlot(cpu, ram, disk, ports).toResourceList())
             .setId(Protos.OfferID.newBuilder().setValue(String.format(OFFER_ID_FORMAT, id++)))
             .setFrameworkId(Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID))
             .setSlaveId(Protos.SlaveID.newBuilder().setValue(attributes.getSlaveId()))

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
index e10656b..e5953bb 100644
--- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
@@ -13,77 +13,181 @@
  */
 package org.apache.aurora.scheduler;
 
-import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.Range;
+
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
+import org.apache.aurora.scheduler.base.Numbers;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
-import org.apache.aurora.scheduler.preemptor.PreemptionVictim;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos;
 
-import static org.apache.mesos.Protos.Offer;
+import static java.util.Objects.requireNonNull;
+
+import static com.twitter.common.quantity.Data.BYTES;
+
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
 
 /**
- * Resource containing class that is aware of executor overhead.
+ * Represents a single task/host aggregate resource vector unaware of any Mesos resource traits.
  */
 public final class ResourceSlot {
-  // TODO(zmanji): Remove this class and overhead in 0.8.0 (AURORA-906)
 
-  private final Resources resources;
+  private final double numCpus;
+  private final Amount<Long, Data> disk;
+  private final Amount<Long, Data> ram;
+  private final int numPorts;
+
+  /**
+   * Empty ResourceSlot value.
+   */
+  public static final ResourceSlot NONE =
+      new ResourceSlot(0, Amount.of(0L, Data.BITS), Amount.of(0L, Data.BITS), 0);
+
+  public ResourceSlot(
+      double numCpus,
+      Amount<Long, Data> ram,
+      Amount<Long, Data> disk,
+      int numPorts) {
+
+    this.numCpus = numCpus;
+    this.ram = requireNonNull(ram);
+    this.disk = requireNonNull(disk);
+    this.numPorts = numPorts;
+  }
 
   /**
    * Minimum resources required to run Thermos. In the wild Thermos needs about 0.01 CPU and
    * about 170MB (peak usage) of RAM. The RAM requirement has been rounded up to a power of 2.
    */
   @VisibleForTesting
-  public static final Resources MIN_THERMOS_RESOURCES = new Resources(
+  public static final ResourceSlot MIN_THERMOS_RESOURCES = new ResourceSlot(
       0.01,
       Amount.of(256L, Data.MB),
       Amount.of(1L, Data.MB),
       0);
 
-  private ResourceSlot(Resources r) {
-    this.resources = r;
+  /**
+   * Extracts the resources required from a task.
+   *
+   * @param task Task to get resources from.
+   * @return The resources required by the task.
+   */
+  public static ResourceSlot from(ITaskConfig task) {
+    requireNonNull(task);
+    return new ResourceSlot(
+        task.getNumCpus(),
+        Amount.of(task.getRamMb(), Data.MB),
+        Amount.of(task.getDiskMb(), Data.MB),
+        task.getRequestedPorts().size());
   }
 
-  public static ResourceSlot from(ITaskConfig task, ExecutorSettings executorSettings) {
-    return from(Resources.from(task), executorSettings);
+  /**
+   * Adapts this slot object to a list of mesos resources.
+   *
+   * @param selectedPorts The ports selected, to be applied as concrete task ranges.
+   * @return Mesos resources.
+   */
+  public List<Protos.Resource> toResourceList(Set<Integer> selectedPorts) {
+    ImmutableList.Builder<Protos.Resource> resourceBuilder =
+        ImmutableList.<Protos.Resource>builder()
+            .add(makeMesosResource(CPUS, numCpus))
+            .add(makeMesosResource(DISK_MB, disk.as(Data.MB)))
+            .add(makeMesosResource(RAM_MB, ram.as(Data.MB)));
+    if (!selectedPorts.isEmpty()) {
+      resourceBuilder.add(makeMesosRangeResource(PORTS, selectedPorts));
+    }
+
+    return resourceBuilder.build();
   }
 
-  public static ResourceSlot from(PreemptionVictim victim, ExecutorSettings executorSettings) {
-    return from(victim.getResources(), executorSettings);
+  /**
+   * Convenience method for adapting to mesos resources without applying a port range.
+   *
+   * @see {@link #toResourceList(java.util.Set)}
+   * @return Mesos resources.
+   */
+  public List<Protos.Resource> toResourceList() {
+    return toResourceList(ImmutableSet.of());
   }
 
-  private static ResourceSlot from(Resources resources, ExecutorSettings executorSettings) {
+  /**
+   * Adds executor resource overhead.
+   *
+   * @param executorSettings Executor settings to get executor overhead from.
+   * @return ResourceSlot with overhead applied.
+   */
+  public ResourceSlot withOverhead(ExecutorSettings executorSettings) {
     // Apply a flat 'tax' of executor overhead resources to the task.
-    Resources requiredTaskResources = sum(
-        resources,
-        executorSettings.getExecutorOverhead());
+    ResourceSlot requiredTaskResources = add(executorSettings.getExecutorOverhead());
 
     // Upsize tasks smaller than the minimum resources required to run the executor.
-    return new ResourceSlot(maxElements(requiredTaskResources, MIN_THERMOS_RESOURCES));
+    return maxElements(requiredTaskResources, MIN_THERMOS_RESOURCES);
+  }
+
+  /**
+   * Creates a mesos resource of integer ranges.
+   *
+   * @param resourceType Resource type.
+   * @param values Values to translate into ranges.
+   * @return A mesos ranges resource.
+   */
+  @VisibleForTesting
+  static Protos.Resource makeMesosRangeResource(
+      ResourceType resourceType,
+      Set<Integer> values) {
+
+    return Protos.Resource.newBuilder()
+        .setName(resourceType.getName())
+        .setType(Protos.Value.Type.RANGES)
+        .setRanges(Protos.Value.Ranges.newBuilder()
+            .addAllRange(Iterables.transform(Numbers.toRanges(values), RANGE_TRANSFORM)))
+        .build();
+  }
+
+  /**
+   * Creates a scalar mesos resource.
+   *
+   * @param resourceType Resource type.
+   * @param value Value for the resource.
+   * @return A mesos resource.
+   */
+  @VisibleForTesting
+  static Protos.Resource makeMesosResource(ResourceType resourceType, double value) {
+    return Protos.Resource.newBuilder()
+        .setName(resourceType.getName())
+        .setType(Protos.Value.Type.SCALAR)
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(value))
+        .build();
   }
 
   /**
-   * Generates a Resource where each resource component is a max out of the two components.
+   * Generates a ResourceSlot where each resource component is a max out of the two components.
    *
    * @param a A resource to compare.
    * @param b A resource to compare.
    *
-   * @return Returns a Resources instance where each component is a max of the two components.
+   * @return Returns a ResourceSlot instance where each component is a max of the two components.
    */
   @VisibleForTesting
-  static Resources maxElements(Resources a, Resources b) {
+  static ResourceSlot maxElements(ResourceSlot a, ResourceSlot b) {
     double maxCPU = Math.max(a.getNumCpus(), b.getNumCpus());
     Amount<Long, Data> maxRAM = Amount.of(
         Math.max(a.getRam().as(Data.MB), b.getRam().as(Data.MB)),
@@ -93,27 +197,43 @@ public final class ResourceSlot {
         Data.MB);
     int maxPorts = Math.max(a.getNumPorts(), b.getNumPorts());
 
-    return new Resources(maxCPU, maxRAM, maxDisk, maxPorts);
-  }
-
-  public static ResourceSlot from(Offer offer) {
-    return new ResourceSlot(Resources.from(offer));
+    return new ResourceSlot(maxCPU, maxRAM, maxDisk, maxPorts);
   }
 
+  /**
+   * Number of CPUs.
+   *
+   * @return CPUs.
+   */
   public double getNumCpus() {
-    return resources.getNumCpus();
+    return numCpus;
   }
 
-  public Amount<Long, Data> getRam() {
-    return resources.getRam();
+  /**
+   * Disk amount.
+   *
+   * @return Disk.
+   */
+  public Amount<Long, Data> getDisk() {
+    return disk;
   }
 
-  public Amount<Long, Data> getDisk() {
-    return resources.getDisk();
+  /**
+   * RAM amount.
+   *
+   * @return RAM.
+   */
+  public Amount<Long, Data> getRam() {
+    return ram;
   }
 
+  /**
+   * Number of ports.
+   *
+   * @return Port count.
+   */
   public int getNumPorts() {
-    return resources.getNumPorts();
+    return numPorts;
   }
 
   @Override
@@ -123,71 +243,69 @@ public final class ResourceSlot {
     }
 
     ResourceSlot other = (ResourceSlot) o;
-    return resources.equals(other.resources);
+    return Objects.equals(numCpus, other.numCpus)
+        && Objects.equals(ram, other.ram)
+        && Objects.equals(disk, other.disk)
+        && Objects.equals(numPorts, other.numPorts);
   }
 
   @Override
   public int hashCode() {
-    return resources.hashCode();
+    return Objects.hash(numCpus, ram, disk, numPorts);
   }
 
-  public static ResourceSlot sum(ResourceSlot... rs) {
-    return sum(Arrays.asList(rs));
-  }
-
-  public static ResourceSlot sum(Iterable<ResourceSlot> rs) {
-    Resources sum = Resources.NONE;
+  /**
+   * Sums up all resources in {@code slots}.
+   *
+   * @param slots Resource slots to sum up.
+   * @return Sum of all resource slots.
+   */
+  public static ResourceSlot sum(Iterable<ResourceSlot> slots) {
+    ResourceSlot sum = NONE;
 
-    for (ResourceSlot r : rs) {
-      double numCpus = sum.getNumCpus() + r.getNumCpus();
-      Amount<Long, Data> disk =
-          Amount.of(sum.getDisk().as(Data.BYTES) + r.getDisk().as(Data.BYTES), Data.BYTES);
-      Amount<Long, Data> ram =
-          Amount.of(sum.getRam().as(Data.BYTES) + r.getRam().as(Data.BYTES), Data.BYTES);
-      int ports = sum.getNumPorts() + r.getNumPorts();
-      sum = new Resources(numCpus, ram, disk, ports);
+    for (ResourceSlot r : slots) {
+      sum = sum.add(r);
     }
 
-    return new ResourceSlot(sum);
-  }
-
-  @VisibleForTesting
-  public static Resources sum(Resources a, Resources b) {
-    return sum(ImmutableList.of(new ResourceSlot(a), new ResourceSlot(b))).resources;
+    return sum;
   }
 
-  public static ResourceSlot subtract(ResourceSlot a, Resources b) {
-    return new ResourceSlot(subtract(a.resources, b));
-  }
-
-  @VisibleForTesting
-  static Resources subtract(Resources a, Resources b) {
-    return new Resources(
-        a.getNumCpus() - b.getNumCpus(),
-        Amount.of(a.getRam().as(Data.MB) - b.getRam().as(Data.MB), Data.MB),
-        Amount.of(a.getDisk().as(Data.MB) - b.getDisk().as(Data.MB), Data.MB),
-        a.getNumPorts() - b.getNumPorts());
+  /**
+   * Adds {@code other}.
+   *
+   * @param other Resource slot to add.
+   * @return Result.
+   */
+  public ResourceSlot add(ResourceSlot other) {
+    return new ResourceSlot(
+        getNumCpus() + other.getNumCpus(),
+        Amount.of(getRam().as(BYTES) + other.getRam().as(BYTES), BYTES),
+        Amount.of(getDisk().as(BYTES) + other.getDisk().as(BYTES), BYTES),
+        getNumPorts() + other.getNumPorts());
   }
 
-  public List<Protos.Resource> toResourceList(Set<Integer> selectedPorts) {
-    return resources.toResourceList(selectedPorts);
+  /**
+   * Subtracts {@code other}.
+   *
+   * @param other Resource slot to subtract.
+   * @return Result.
+   */
+  public ResourceSlot subtract(ResourceSlot other) {
+    return new ResourceSlot(
+        getNumCpus() - other.getNumCpus(),
+        Amount.of(getRam().as(BYTES) - other.getRam().as(BYTES), BYTES),
+        Amount.of(getDisk().as(BYTES) - other.getDisk().as(BYTES), BYTES),
+        getNumPorts() - other.getNumPorts());
   }
 
-  public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() {
-    @Override
-    public int compare(ResourceSlot left, ResourceSlot right) {
-      return RESOURCE_ORDER.compare(left.resources, right.resources);
-    }
-  };
-
   /**
    * A Resources object is greater than another iff _all_ of its resource components are greater
    * or equal. A Resources object compares as equal if some but not all components are greater than
    * or equal to the other.
    */
-  public static final Ordering<Resources> RESOURCE_ORDER = new Ordering<Resources>() {
+  public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() {
     @Override
-    public int compare(Resources left, Resources right) {
+    public int compare(ResourceSlot left, ResourceSlot right) {
       int diskC = left.getDisk().compareTo(right.getDisk());
       int ramC = left.getRam().compareTo(right.getRam());
       int portC = Integer.compare(left.getNumPorts(), right.getNumPorts());
@@ -213,4 +331,15 @@ public final class ResourceSlot {
   };
 
   private static final Predicate<Integer> IS_ZERO = e -> e == 0;
+
+  private static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM =
+      new Function<Range<Integer>, Protos.Value.Range>() {
+        @Override
+        public Protos.Value.Range apply(Range<Integer> input) {
+          return Protos.Value.Range.newBuilder()
+              .setBegin(input.lowerEndpoint())
+              .setEnd(input.upperEndpoint())
+              .build();
+        }
+      };
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/ResourceType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/ResourceType.java
new file mode 100644
index 0000000..b4efc8d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceType.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Defines Mesos resource types.
+ */
+@VisibleForTesting
+public enum ResourceType {
+  /**
+   * CPU resource.
+   */
+  CPUS("cpus"),
+
+  /**
+   * RAM resource.
+   */
+  RAM_MB("mem"),
+
+  /**
+   * DISK resource.
+   */
+  DISK_MB("disk"),
+
+  /**
+   * Port resource.
+   */
+  PORTS("ports");
+
+  private final String resourceName;
+
+  ResourceType(String resourceName) {
+    this.resourceName = requireNonNull(resourceName);
+  }
+
+  public String getName() {
+    return resourceName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java
index e380245..7b1b54e 100644
--- a/src/main/java/org/apache/aurora/scheduler/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/Resources.java
@@ -18,9 +18,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.Predicate;
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableList;
@@ -31,28 +29,22 @@ import com.google.common.collect.Sets;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
-import org.apache.aurora.scheduler.base.Numbers;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.Value.Range;
-import org.apache.mesos.Protos.Value.Ranges;
-import org.apache.mesos.Protos.Value.Scalar;
-import org.apache.mesos.Protos.Value.Type;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
+
 /**
  * A container for multiple resource vectors.
  * TODO(wfarner): Collapse this in with ResourceAggregates AURORA-105.
  */
-public class Resources {
-
-  public static final String CPUS = "cpus";
-  public static final String RAM_MB = "mem";
-  public static final String DISK_MB = "disk";
-  public static final String PORTS = "ports";
-
+public final class Resources {
   private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
       new Function<Range, Set<Integer>>() {
         @Override
@@ -68,50 +60,13 @@ public class Resources {
   private final Amount<Long, Data> ram;
   private final int numPorts;
 
-  /**
-   * Creates a new resources object.
-   *
-   * @param numCpus Number of CPUs.
-   * @param ram Amount of RAM.
-   * @param disk Amount of disk.
-   * @param numPorts Number of ports.
-   */
-  public Resources(double numCpus, Amount<Long, Data> ram, Amount<Long, Data> disk, int numPorts) {
+  private Resources(double numCpus, Amount<Long, Data> ram, Amount<Long, Data> disk, int numPorts) {
     this.numCpus = numCpus;
     this.ram = requireNonNull(ram);
     this.disk = requireNonNull(disk);
     this.numPorts = numPorts;
   }
 
-  /**
-   * Adapts this resources object to a list of mesos resources.
-   *
-   * @param selectedPorts The ports selected, to be applied as concrete task ranges.
-   * @return Mesos resources.
-   */
-  public List<Resource> toResourceList(Set<Integer> selectedPorts) {
-    ImmutableList.Builder<Resource> resourceBuilder =
-        ImmutableList.<Resource>builder()
-            .add(Resources.makeMesosResource(CPUS, numCpus))
-            .add(Resources.makeMesosResource(DISK_MB, disk.as(Data.MB)))
-            .add(Resources.makeMesosResource(RAM_MB, ram.as(Data.MB)));
-    if (!selectedPorts.isEmpty()) {
-      resourceBuilder.add(Resources.makeMesosRangeResource(Resources.PORTS, selectedPorts));
-    }
-
-    return resourceBuilder.build();
-  }
-
-  /**
-   * Convenience method for adapting to mesos resources without applying a port range.
-   *
-   * @see {@link #toResourceList(java.util.Set)}
-   * @return Mesos resources.
-   */
-  public List<Resource> toResourceList() {
-    return toResourceList(ImmutableSet.of());
-  }
-
   @Override
   public boolean equals(Object o) {
     if (!(o instanceof Resources)) {
@@ -141,37 +96,6 @@ public class Resources {
   }
 
   /**
-   * Extracts the resources required from a task.
-   *
-   * @param task Task to get resources from.
-   * @return The resources required by the task.
-   */
-  public static Resources from(ITaskConfig task) {
-    requireNonNull(task);
-    return new Resources(
-        task.getNumCpus(),
-        Amount.of(task.getRamMb(), Data.MB),
-        Amount.of(task.getDiskMb(), Data.MB),
-        task.getRequestedPorts().size());
-  }
-
-  /**
-   * Extracts the resources specified in a list of resource objects.
-   *
-   * @param resources Resources to translate.
-   * @return The canonical resources.
-   */
-  public static Resources from(List<Resource> resources) {
-    requireNonNull(resources);
-    return new Resources(
-        getScalarValue(resources, CPUS),
-        Amount.of((long) getScalarValue(resources, RAM_MB), Data.MB),
-        Amount.of((long) getScalarValue(resources, DISK_MB), Data.MB),
-        getNumAvailablePorts(resources)
-    );
-  }
-
-  /**
    * Extracts the resources available in a slave offer.
    *
    * @param offer Offer to get resources from.
@@ -180,16 +104,12 @@ public class Resources {
   public static Resources from(Offer offer) {
     requireNonNull(offer);
     return new Resources(
-        getScalarValue(offer, CPUS),
-        Amount.of((long) getScalarValue(offer, RAM_MB), Data.MB),
-        Amount.of((long) getScalarValue(offer, DISK_MB), Data.MB),
+        getScalarValue(offer, CPUS.getName()),
+        Amount.of((long) getScalarValue(offer, RAM_MB.getName()), Data.MB),
+        Amount.of((long) getScalarValue(offer, DISK_MB.getName()), Data.MB),
         getNumAvailablePorts(offer.getResourcesList()));
   }
 
-  @VisibleForTesting
-  public static final Resources NONE =
-      new Resources(0, Amount.of(0L, Data.BITS), Amount.of(0L, Data.BITS), 0);
-
   private static int getNumAvailablePorts(List<Resource> resource) {
     int offeredPorts = 0;
     for (Range range : getPortRanges(resource)) {
@@ -212,20 +132,11 @@ public class Resources {
   }
 
   private static Resource getResource(List<Resource> resource, String key) {
-    return Iterables.find(resource, withName(key), null);
-  }
-
-  private static Predicate<Resource> withName(final String name) {
-    return new Predicate<Resource>() {
-      @Override
-      public boolean apply(Resource resource) {
-        return resource.getName().equals(name);
-      }
-    };
+    return Iterables.find(resource, e -> e.getName().equals(key), null);
   }
 
   private static Iterable<Range> getPortRanges(List<Resource> resources) {
-    Resource resource = getResource(resources, Resources.PORTS);
+    Resource resource = getResource(resources, PORTS.getName());
     if (resource == null) {
       return ImmutableList.of();
     }
@@ -234,79 +145,12 @@ public class Resources {
   }
 
   /**
-   * Creates a scalar mesos resource.
-   *
-   * @param name Name of the resource.
-   * @param value Value for the resource.
-   * @return A mesos resource.
-   */
-  public static Resource makeMesosResource(String name, double value) {
-    return Resource.newBuilder().setName(name).setType(Type.SCALAR)
-        .setScalar(Scalar.newBuilder().setValue(value)).build();
-  }
-
-  private static final Function<com.google.common.collect.Range<Integer>, Range> RANGE_TRANSFORM =
-      new Function<com.google.common.collect.Range<Integer>, Range>() {
-        @Override
-        public Range apply(com.google.common.collect.Range<Integer> input) {
-          return Range.newBuilder()
-              .setBegin(input.lowerEndpoint())
-              .setEnd(input.upperEndpoint())
-              .build();
-        }
-      };
-
-  /**
-   * Creates a mesos resource of integer ranges.
-   *
-   * @param name Name of the resource
-   * @param values Values to translate into ranges.
-   * @return A mesos ranges resource.
-   */
-  @VisibleForTesting
-  public static Resource makeMesosRangeResource(String name, Set<Integer> values) {
-    return Resource.newBuilder()
-        .setName(name)
-        .setType(Type.RANGES)
-        .setRanges(Ranges.newBuilder()
-            .addAllRange(Iterables.transform(Numbers.toRanges(values), RANGE_TRANSFORM)))
-        .build();
-  }
-
-  /**
-   * Number of CPUs.
-   *
-   * @return CPUs.
-   */
-  public double getNumCpus() {
-    return numCpus;
-  }
-
-  /**
-   * Disk amount.
-   *
-   * @return Disk.
-   */
-  public Amount<Long, Data> getDisk() {
-    return disk;
-  }
-
-  /**
-   * RAM amount.
-   *
-   * @return RAM.
-   */
-  public Amount<Long, Data> getRam() {
-    return ram;
-  }
-
-  /**
-   * Number of ports.
+   * Gets generalized aggregated resource view.
    *
-   * @return Port count.
+   * @return {@code ResourceSlot} instance.
    */
-  public int getNumPorts() {
-    return numPorts;
+  public ResourceSlot slot() {
+    return new ResourceSlot(numCpus, ram, disk, numPorts);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 102069a..e74b36b 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -47,7 +47,7 @@ import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientCon
 import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
 
 import org.apache.aurora.gen.Volume;
-import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.SchedulerLifecycle;
 import org.apache.aurora.scheduler.cron.quartz.CronModule;
 import org.apache.aurora.scheduler.http.HttpService;
@@ -195,7 +195,7 @@ public class SchedulerMain extends AbstractApplication {
         .add(new AbstractModule() {
           @Override
           protected void configure() {
-            Resources executorOverhead = new Resources(
+            ResourceSlot executorOverhead = new ResourceSlot(
                 EXECUTOR_OVERHEAD_CPUS.get(),
                 EXECUTOR_OVERHEAD_RAM.get(),
                 Amount.of(0L, Data.MB),

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 08d7ac7..1038964 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -73,7 +73,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
       return range;
     }
 
-    private ResourceVector(String name, int range) {
+    ResourceVector(String name, int range) {
       this.name = name;
       this.range = range;
     }
@@ -204,6 +204,6 @@ public class SchedulingFilterImpl implements SchedulingFilter {
     // 4. Resource check (lowest score).
     return getResourceVetoes(
         resource.getResourceSlot(),
-        ResourceSlot.from(request.getTask(), executorSettings));
+        ResourceSlot.from(request.getTask()).withOverhead(executorSettings));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/mesos/ExecutorSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/mesos/ExecutorSettings.java
index 44011f8..b3c9138 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/ExecutorSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/ExecutorSettings.java
@@ -19,7 +19,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.aurora.gen.Volume;
-import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.ResourceSlot;
 
 import static java.util.Objects.requireNonNull;
 
@@ -31,7 +31,7 @@ public final class ExecutorSettings {
   private final List<String> executorResources;
   private final String thermosObserverRoot;
   private final Optional<String> executorFlags;
-  private final Resources executorOverhead;
+  private final ResourceSlot executorOverhead;
   private final List<Volume> globalContainerMounts;
 
   ExecutorSettings(
@@ -39,7 +39,7 @@ public final class ExecutorSettings {
       List<String> executorResources,
       String thermosObserverRoot,
       Optional<String> executorFlags,
-      Resources executorOverhead,
+      ResourceSlot executorOverhead,
       List<Volume> globalContainerMounts) {
 
     this.executorPath = requireNonNull(executorPath);
@@ -66,7 +66,7 @@ public final class ExecutorSettings {
     return executorFlags;
   }
 
-  public Resources getExecutorOverhead() {
+  public ResourceSlot getExecutorOverhead() {
     return executorOverhead;
   }
 
@@ -83,13 +83,13 @@ public final class ExecutorSettings {
     private List<String> executorResources;
     private String thermosObserverRoot;
     private Optional<String> executorFlags;
-    private Resources executorOverhead;
+    private ResourceSlot executorOverhead;
     private List<Volume> globalContainerMounts;
 
     Builder() {
       executorResources = ImmutableList.of();
       executorFlags = Optional.absent();
-      executorOverhead = Resources.NONE;
+      executorOverhead = ResourceSlot.NONE;
       globalContainerMounts = ImmutableList.of();
     }
 
@@ -113,7 +113,7 @@ public final class ExecutorSettings {
       return this;
     }
 
-    public Builder setExecutorOverhead(Resources executorOverhead) {
+    public Builder setExecutorOverhead(ResourceSlot executorOverhead) {
       this.executorOverhead = executorOverhead;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index c160a53..ff6eb98 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -30,7 +30,6 @@ import com.twitter.common.quantity.Data;
 import org.apache.aurora.Protobufs;
 import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.base.CommandUtil;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.SchedulerException;
@@ -110,7 +109,7 @@ public interface MesosTaskFactory {
      * anyway.
      */
     @VisibleForTesting
-    static final Resources RESOURCES_EPSILON = new Resources(
+    static final ResourceSlot RESOURCES_EPSILON = new ResourceSlot(
         0.01,
         Amount.of(32L, Data.MB),
         Amount.of(1L, Data.MB),
@@ -130,12 +129,13 @@ public interface MesosTaskFactory {
       }
 
       ITaskConfig config = task.getTask();
-      ResourceSlot resourceSlot =
-          ResourceSlot.subtract(ResourceSlot.from(config, executorSettings), RESOURCES_EPSILON);
+      ResourceSlot resourceSlot = ResourceSlot.from(config)
+          .withOverhead(executorSettings)
+          .subtract(RESOURCES_EPSILON);
 
       // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
-      List<Resource> resources = resourceSlot
-          .toResourceList(task.isSetAssignedPorts()
+      List<Resource> resources = resourceSlot.toResourceList(
+          task.isSetAssignedPorts()
               ? ImmutableSet.copyOf(task.getAssignedPorts().values())
               : ImmutableSet.of());
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
index 8d2f069..8162323 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
@@ -15,7 +15,7 @@ package org.apache.aurora.scheduler.preemptor;
 
 import java.util.Objects;
 
-import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -27,7 +27,7 @@ public final class PreemptionVictim {
   private final boolean production;
   private final String role;
   private final int priority;
-  private final Resources resources;
+  private final ResourceSlot resourceSlot;
   private final String taskId;
 
   private PreemptionVictim(
@@ -35,14 +35,14 @@ public final class PreemptionVictim {
       boolean production,
       String role,
       int priority,
-      Resources resources,
+      ResourceSlot resourceSlot,
       String taskId) {
 
     this.slaveHost = slaveHost;
     this.production = production;
     this.role = role;
     this.priority = priority;
-    this.resources = resources;
+    this.resourceSlot = resourceSlot;
     this.taskId = taskId;
   }
 
@@ -53,7 +53,7 @@ public final class PreemptionVictim {
         config.isProduction(),
         config.getJob().getRole(),
         config.getPriority(),
-        Resources.from(task.getTask()),
+        ResourceSlot.from(task.getTask()),
         task.getTaskId());
   }
 
@@ -73,8 +73,8 @@ public final class PreemptionVictim {
     return priority;
   }
 
-  public Resources getResources() {
-    return resources;
+  public ResourceSlot getResourceSlot() {
+    return resourceSlot;
   }
 
   public String getTaskId() {
@@ -92,13 +92,13 @@ public final class PreemptionVictim {
         && Objects.equals(isProduction(), other.isProduction())
         && Objects.equals(getRole(), other.getRole())
         && Objects.equals(getPriority(), other.getPriority())
-        && Objects.equals(getResources(), other.getResources())
+        && Objects.equals(getResourceSlot(), other.getResourceSlot())
         && Objects.equals(getTaskId(), other.getTaskId());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(slaveHost, production, role, priority, resources, taskId);
+    return Objects.hash(slaveHost, production, role, priority, resourceSlot, taskId);
   }
 
   @Override
@@ -108,7 +108,7 @@ public final class PreemptionVictim {
         .add("production", isProduction())
         .add("role", getRole())
         .add("priority", getPriority())
-        .add("resources", getResources())
+        .add("resourceSlot", getResourceSlot())
         .add("taskId", getTaskId())
         .toString();
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index 4293415..a0e71e1 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -40,6 +41,8 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.ResourceSlot.sum;
+
 /**
  * Filters active tasks (victims) and available offer (slack) resources that can accommodate a
  * given task (candidate), provided victims are preempted.
@@ -93,7 +96,7 @@ public interface PreemptionVictimFilter {
         new Function<HostOffer, ResourceSlot>() {
           @Override
           public ResourceSlot apply(HostOffer offer) {
-            return ResourceSlot.from(offer.getOffer());
+            return Resources.from(offer.getOffer()).slot();
           }
         };
 
@@ -117,7 +120,7 @@ public interface PreemptionVictimFilter {
         new Function<PreemptionVictim, ResourceSlot>() {
           @Override
           public ResourceSlot apply(PreemptionVictim victim) {
-            return ResourceSlot.from(victim, executorSettings);
+            return victim.getResourceSlot().withOverhead(executorSettings);
           }
         };
 
@@ -140,8 +143,7 @@ public interface PreemptionVictimFilter {
           .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST))
           .addAll(Iterables.transform(offer.asSet(), OFFER_TO_HOST)).build();
 
-      ResourceSlot slackResources =
-          ResourceSlot.sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT));
+      ResourceSlot slackResources = sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT));
 
       FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims)
           .filter(preemptionFilter(pendingTask));
@@ -166,9 +168,8 @@ public interface PreemptionVictimFilter {
       for (PreemptionVictim victim : sortedVictims) {
         toPreemptTasks.add(victim);
 
-        ResourceSlot totalResource = ResourceSlot.sum(
-            ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)),
-            slackResources);
+        ResourceSlot totalResource =
+            sum(Iterables.transform(toPreemptTasks, victimToResources)).add(slackResources);
 
         Set<Veto> vetoes = schedulingFilter.filter(
             new UnusedResource(totalResource, attributes.get()),

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 38375db..a7a4381 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -32,7 +32,6 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -149,7 +148,7 @@ public interface TaskAssigner {
           continue;
         }
         Set<Veto> vetoes = filter.filter(
-            new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
+            new UnusedResource(Resources.from(offer.getOffer()).slot(), offer.getAttributes()),
             resourceRequest);
         if (vetoes.isEmpty()) {
           TaskInfo taskInfo = assign(

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 81b1640..74a6546 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -30,6 +30,7 @@ import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.base.Conversions;
@@ -139,7 +140,7 @@ public class AsyncStatsModule extends AbstractModule {
         new Function<HostOffer, MachineResource>() {
           @Override
           public MachineResource apply(HostOffer offer) {
-            Resources resources = Resources.from(offer.getOffer());
+            ResourceSlot resources = Resources.from(offer.getOffer()).slot();
             IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
                 .setNumCpus(resources.getNumCpus())
                 .setRamMb(resources.getRam().as(Data.MB))

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
index d1294e2..d537315 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
@@ -13,30 +13,52 @@
  */
 package org.apache.aurora.scheduler;
 
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import com.twitter.common.collections.Pair;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.ResourceSlot.makeMesosRangeResource;
+import static org.apache.aurora.scheduler.ResourceSlot.makeMesosResource;
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 
 public class ResourceSlotTest {
 
-  private static final Resources NEGATIVE_ONE =
-      new Resources(-1.0, Amount.of(-1L, Data.MB), Amount.of(-1L, Data.MB), -1);
-  private static final Resources ONE =
-      new Resources(1.0, Amount.of(1L, Data.MB), Amount.of(1L, Data.MB), 1);
-  private static final Resources TWO =
-      new Resources(2.0, Amount.of(2L, Data.MB), Amount.of(2L, Data.MB), 2);
-  private static final Resources THREE =
-      new Resources(3.0, Amount.of(3L, Data.MB), Amount.of(3L, Data.MB), 3);
+  private static final ResourceSlot NEGATIVE_ONE =
+      new ResourceSlot(-1.0, Amount.of(-1L, Data.MB), Amount.of(-1L, Data.MB), -1);
+  private static final ResourceSlot ONE =
+      new ResourceSlot(1.0, Amount.of(1L, Data.MB), Amount.of(1L, Data.MB), 1);
+  private static final ResourceSlot TWO =
+      new ResourceSlot(2.0, Amount.of(2L, Data.MB), Amount.of(2L, Data.MB), 2);
+  private static final ResourceSlot THREE =
+      new ResourceSlot(3.0, Amount.of(3L, Data.MB), Amount.of(3L, Data.MB), 3);
+  private static final ITaskConfig TASK = ITaskConfig.build(new TaskConfig()
+      .setNumCpus(1.0)
+      .setRamMb(1024)
+      .setDiskMb(2048)
+      .setRequestedPorts(ImmutableSet.of("http", "debug")));
 
   @Test
   public void testMaxElements() {
-    Resources highRAM = new Resources(1, Amount.of(8L, Data.GB), Amount.of(10L, Data.MB), 0);
-    Resources rest = new Resources(10, Amount.of(1L, Data.MB), Amount.of(10L, Data.GB), 1);
+    ResourceSlot highRAM = new ResourceSlot(1, Amount.of(8L, Data.GB), Amount.of(10L, Data.MB), 0);
+    ResourceSlot rest = new ResourceSlot(10, Amount.of(1L, Data.MB), Amount.of(10L, Data.GB), 1);
 
-    Resources result = ResourceSlot.maxElements(highRAM, rest);
+    ResourceSlot result = ResourceSlot.maxElements(highRAM, rest);
     assertEquals(result.getNumCpus(), 10, 0.001);
     assertEquals(result.getRam(), Amount.of(8L, Data.GB));
     assertEquals(result.getDisk(), Amount.of(10L, Data.GB));
@@ -45,16 +67,86 @@ public class ResourceSlotTest {
 
   @Test
   public void testSubtract() {
-    assertEquals(ONE, ResourceSlot.subtract(TWO, ONE));
-    assertEquals(TWO, ResourceSlot.subtract(THREE, ONE));
-    assertEquals(NEGATIVE_ONE, ResourceSlot.subtract(ONE, TWO));
-    assertEquals(NEGATIVE_ONE, ResourceSlot.subtract(TWO, THREE));
+    assertEquals(ONE, TWO.subtract(ONE));
+    assertEquals(TWO, THREE.subtract(ONE));
+    assertEquals(NEGATIVE_ONE, ONE.subtract(TWO));
+    assertEquals(NEGATIVE_ONE, TWO.subtract(THREE));
+  }
+
+  @Test
+  public void testAdd() {
+    assertEquals(TWO, ONE.add(ONE));
+    assertEquals(THREE, ONE.add(TWO));
+    assertEquals(THREE, TWO.add(ONE));
+  }
+
+  @Test
+  public void testToResourceList() {
+    ResourceSlot resources = ResourceSlot.from(TASK);
+    Set<Integer> ports = ImmutableSet.of(80, 443);
+    assertEquals(
+        ImmutableSet.of(
+            makeMesosResource(CPUS, TASK.getNumCpus()),
+            makeMesosResource(RAM_MB, TASK.getRamMb()),
+            makeMesosResource(DISK_MB, TASK.getDiskMb()),
+            makeMesosRangeResource(PORTS, ports)),
+        ImmutableSet.copyOf(resources.toResourceList(ports)));
+  }
+
+  @Test
+  public void testToResourceListNoPorts() {
+    ResourceSlot resources = ResourceSlot.from(TASK);
+    assertEquals(
+        ImmutableSet.of(
+            makeMesosResource(CPUS, TASK.getNumCpus()),
+            makeMesosResource(RAM_MB, TASK.getRamMb()),
+            makeMesosResource(DISK_MB, TASK.getDiskMb())),
+        ImmutableSet.copyOf(resources.toResourceList(ImmutableSet.of())));
+  }
+
+  @Test
+  public void testRangeResourceEmpty() {
+    expectRanges(ImmutableSet.of(), ImmutableSet.of());
+  }
+
+  @Test
+  public void testRangeResourceOneEntry() {
+    expectRanges(ImmutableSet.of(Pair.of(5L, 5L)), ImmutableSet.of(5));
+    expectRanges(ImmutableSet.of(Pair.of(0L, 0L)), ImmutableSet.of(0));
+  }
+
+  @Test
+  public void testRangeResourceNonContiguous() {
+    expectRanges(ImmutableSet.of(Pair.of(1L, 1L), Pair.of(3L, 3L), Pair.of(5L, 5L)),
+        ImmutableSet.of(5, 1, 3));
   }
 
   @Test
-  public void testSum() {
-    assertEquals(TWO, ResourceSlot.sum(ONE, ONE));
-    assertEquals(THREE, ResourceSlot.sum(ONE, TWO));
-    assertEquals(THREE, ResourceSlot.sum(TWO, ONE));
+  public void testRangeResourceContiguous() {
+    expectRanges(ImmutableSet.of(Pair.of(1L, 2L), Pair.of(4L, 5L), Pair.of(7L, 9L)),
+        ImmutableSet.of(8, 2, 4, 5, 7, 9, 1));
+  }
+
+  @Test
+  public void testEqualsBadType() {
+    ResourceSlot resources = ResourceSlot.from(TASK);
+    assertNotEquals(resources, "Hello");
+    assertNotEquals(resources, null);
+  }
+
+  private void expectRanges(Set<Pair<Long, Long>> expected, Set<Integer> values) {
+    Protos.Resource resource = makeMesosRangeResource(PORTS, values);
+    assertEquals(Protos.Value.Type.RANGES, resource.getType());
+    assertEquals(PORTS.getName(), resource.getName());
+
+    Set<Pair<Long, Long>> actual = ImmutableSet.copyOf(Iterables.transform(
+        resource.getRanges().getRangeList(),
+        new Function<Protos.Value.Range, Pair<Long, Long>>() {
+          @Override
+          public Pair<Long, Long> apply(Protos.Value.Range range) {
+            return Pair.of(range.getBegin(), range.getEnd());
+          }
+        }));
+    assertEquals(expected, actual);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
index 8bc56b2..313cf68 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
@@ -15,16 +15,10 @@ package org.apache.aurora.scheduler;
 
 import java.util.Set;
 
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.twitter.common.collections.Pair;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
 
-import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.Resources.InsufficientResourcesException;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.Value.Range;
@@ -32,14 +26,11 @@ import org.apache.mesos.Protos.Value.Ranges;
 import org.apache.mesos.Protos.Value.Type;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.ResourceType.PORTS;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class ResourcesTest {
-
-  private static final String NAME = "resource_name";
-
   @Test
   public void testPortRangeExact() {
     Resource portsResource = createPortRange(Pair.of(1, 5));
@@ -107,7 +98,7 @@ public class ResourcesTest {
     }
 
     return Resource.newBuilder()
-        .setName(Resources.PORTS)
+        .setName(PORTS.getName())
         .setType(Type.RANGES)
         .setRanges(ranges)
         .build();
@@ -121,97 +112,4 @@ public class ResourcesTest {
         .setHostname("hostname")
         .addResources(resources).build();
   }
-
-  @Test
-  public void testRangeResourceEmpty() {
-    expectRanges(ImmutableSet.of(), ImmutableSet.of());
-  }
-
-  @Test
-  public void testRangeResourceOneEntry() {
-    expectRanges(ImmutableSet.of(Pair.of(5L, 5L)), ImmutableSet.of(5));
-    expectRanges(ImmutableSet.of(Pair.of(0L, 0L)), ImmutableSet.of(0));
-  }
-
-  @Test
-  public void testRangeResourceNonContiguous() {
-    expectRanges(ImmutableSet.of(Pair.of(1L, 1L), Pair.of(3L, 3L), Pair.of(5L, 5L)),
-        ImmutableSet.of(5, 1, 3));
-  }
-
-  @Test
-  public void testRangeResourceContiguous() {
-    expectRanges(ImmutableSet.of(Pair.of(1L, 2L), Pair.of(4L, 5L), Pair.of(7L, 9L)),
-        ImmutableSet.of(8, 2, 4, 5, 7, 9, 1));
-  }
-
-  private static final ITaskConfig TASK = ITaskConfig.build(new TaskConfig()
-      .setNumCpus(1.0)
-      .setRamMb(1024)
-      .setDiskMb(2048)
-      .setRequestedPorts(ImmutableSet.of("http", "debug")));
-
-  @Test
-  public void testAccessors() {
-    Resources resources = Resources.from(TASK);
-    assertEquals(TASK.getNumCpus(), resources.getNumCpus(), 1e-9);
-    assertEquals(Amount.of(TASK.getRamMb(), Data.MB), resources.getRam());
-    assertEquals(Amount.of(TASK.getDiskMb(), Data.MB), resources.getDisk());
-    assertEquals(TASK.getRequestedPorts().size(), resources.getNumPorts());
-  }
-
-  @Test
-  public void testToResourceList() {
-    Resources resources = Resources.from(TASK);
-    Set<Integer> ports = ImmutableSet.of(80, 443);
-    assertEquals(
-        ImmutableSet.of(
-            Resources.makeMesosResource(Resources.CPUS, TASK.getNumCpus()),
-            Resources.makeMesosResource(Resources.RAM_MB, TASK.getRamMb()),
-            Resources.makeMesosResource(Resources.DISK_MB, TASK.getDiskMb()),
-            Resources.makeMesosRangeResource(Resources.PORTS, ports)),
-        ImmutableSet.copyOf(resources.toResourceList(ports)));
-  }
-
-  @Test
-  public void testToResourceListInversible() {
-    Resources resources = Resources.from(TASK);
-    Resources inverse = Resources.from(resources.toResourceList(ImmutableSet.of(80, 443)));
-    assertEquals(resources, inverse);
-    assertEquals(resources.hashCode(), inverse.hashCode());
-  }
-
-  @Test
-  public void testEqualsBadType() {
-    Resources resources = Resources.from(TASK);
-    assertNotEquals(resources, "Hello");
-    assertNotEquals(resources, null);
-  }
-
-  @Test
-  public void testToResourceListNoPorts() {
-    Resources resources = Resources.from(TASK);
-    assertEquals(
-        ImmutableSet.of(
-            Resources.makeMesosResource(Resources.CPUS, TASK.getNumCpus()),
-            Resources.makeMesosResource(Resources.RAM_MB, TASK.getRamMb()),
-            Resources.makeMesosResource(Resources.DISK_MB, TASK.getDiskMb())),
-        ImmutableSet.copyOf(resources.toResourceList(ImmutableSet.of())));
-  }
-
-  private void expectRanges(Set<Pair<Long, Long>> expected, Set<Integer> values) {
-    Resource resource = Resources.makeMesosRangeResource(NAME, values);
-    assertEquals(Type.RANGES, resource.getType());
-    assertEquals(NAME, resource.getName());
-
-    Set<Pair<Long, Long>> actual = ImmutableSet.copyOf(Iterables.transform(
-        resource.getRanges().getRangeList(),
-        new Function<Range, Pair<Long, Long>>() {
-          @Override
-          public Pair<Long, Long> apply(Range range) {
-            return Pair.of(range.getBegin(), range.getEnd());
-          }
-        }));
-    assertEquals(expected, actual);
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 0151dd1..37772d0 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -76,7 +76,7 @@ import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
-import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.FlushableWorkQueue;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
@@ -197,7 +197,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
         bind(DriverFactory.class).toInstance(driverFactory);
         bind(DriverSettings.class).toInstance(SETTINGS);
         bind(Log.class).toInstance(log);
-        Resources executorOverhead = new Resources(
+        ResourceSlot executorOverhead = new ResourceSlot(
             0.1,
             Amount.of(1L, Data.MB),
             Amount.of(0L, Data.MB),

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/app/local/simulator/ClusterSimulatorModule.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/simulator/ClusterSimulatorModule.java b/src/test/java/org/apache/aurora/scheduler/app/local/simulator/ClusterSimulatorModule.java
index 24278ba..31d8803 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/simulator/ClusterSimulatorModule.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/simulator/ClusterSimulatorModule.java
@@ -24,12 +24,15 @@ import com.google.inject.multibindings.Multibinder;
 import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.base.Command;
 
-import org.apache.aurora.scheduler.Resources;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Offer;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static org.apache.mesos.Protos.Value.Type.RANGES;
 import static org.apache.mesos.Protos.Value.Type.SCALAR;
@@ -86,13 +89,13 @@ public class ClusterSimulatorModule extends AbstractModule {
 
     String host = slaveId + "-hostname";
     return Offer.newBuilder()
-        .addResources(Protos.Resource.newBuilder().setType(SCALAR).setName(Resources.CPUS)
+        .addResources(Protos.Resource.newBuilder().setType(SCALAR).setName(CPUS.getName())
             .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu)))
-        .addResources(Protos.Resource.newBuilder().setType(SCALAR).setName(Resources.RAM_MB)
+        .addResources(Protos.Resource.newBuilder().setType(SCALAR).setName(RAM_MB.getName())
             .setScalar(Protos.Value.Scalar.newBuilder().setValue(ramMb)))
-        .addResources(Protos.Resource.newBuilder().setType(SCALAR).setName(Resources.DISK_MB)
+        .addResources(Protos.Resource.newBuilder().setType(SCALAR).setName(DISK_MB.getName())
             .setScalar(Protos.Value.Scalar.newBuilder().setValue(diskMb)))
-        .addResources(Protos.Resource.newBuilder().setType(RANGES).setName(Resources.PORTS)
+        .addResources(Protos.Resource.newBuilder().setType(RANGES).setName(PORTS.getName())
             .setRanges(portRanges))
         .addAttributes(Protos.Attribute.newBuilder().setType(Protos.Value.Type.TEXT)
             .setName("host")

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index abbc8e3..608af1a 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -46,7 +46,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
       .setDiskMb(1024));
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK);
   private static final UnusedResource RESOURCE = new UnusedResource(
-      ResourceSlot.from(TASK, TaskExecutors.NO_OVERHEAD_EXECUTOR),
+      ResourceSlot.from(TASK).withOverhead(TaskExecutors.NO_OVERHEAD_EXECUTOR),
       IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
   private static final ResourceRequest REQUEST =
       new ResourceRequest(TASK, AttributeAggregate.EMPTY);

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 6441ffd..b2327a4 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -34,6 +34,7 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
@@ -57,9 +58,6 @@ import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVe
 import static org.junit.Assert.assertEquals;
 
 public class SchedulingFilterImplTest extends EasyMockTest {
-
-  private static final String TASK_ID = "taskId";
-
   private static final String HOST_A = "hostA";
   private static final String HOST_B = "hostB";
   private static final String HOST_C = "hostC";
@@ -84,8 +82,8 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private static final int DEFAULT_CPUS = 4;
   private static final long DEFAULT_RAM = 1000;
   private static final long DEFAULT_DISK = 2000;
-  private static final ResourceSlot DEFAULT_OFFER = ResourceSlot.from(
-      Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 80)));
+  private static final ResourceSlot DEFAULT_OFFER = Resources.from(
+      Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 80))).slot();
 
   private SchedulingFilter defaultFilter;
 
@@ -109,8 +107,8 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   public void testSufficientPorts() {
     control.replay();
 
-    ResourceSlot twoPorts = ResourceSlot.from(
-        Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 81)));
+    ResourceSlot twoPorts = Resources.from(
+        Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 81))).slot();
 
     ITaskConfig noPortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
         .newBuilder()

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index c74cc7f..02fe964 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -45,6 +45,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.ResourceSlot.MIN_THERMOS_RESOURCES;
+import static org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl.RESOURCES_EPSILON;
 import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
 import static org.apache.aurora.scheduler.mesos.TaskExecutors.SOME_OVERHEAD_EXECUTOR;
 import static org.junit.Assert.assertEquals;
@@ -88,7 +89,7 @@ public class MesosTaskFactoryImplTest {
       .setExecutorId(MesosTaskFactoryImpl.getExecutorId(TASK.getTaskId()))
       .setName(MesosTaskFactoryImpl.EXECUTOR_NAME)
       .setSource(MesosTaskFactoryImpl.getInstanceSourceName(TASK.getTask(), TASK.getInstanceId()))
-      .addAllResources(MesosTaskFactoryImpl.RESOURCES_EPSILON.toResourceList())
+      .addAllResources(RESOURCES_EPSILON.toResourceList())
       .setCommand(CommandInfo.newBuilder()
           .setValue("./executor.pex")
           .addUris(URI.newBuilder().setValue(NO_OVERHEAD_EXECUTOR.getExecutorPath())
@@ -166,7 +167,7 @@ public class MesosTaskFactoryImplTest {
 
   private void checkTaskResources(ITaskConfig task, TaskInfo taskInfo) {
     assertEquals(
-        ResourceSlot.sum(Resources.from(task), config.getExecutorOverhead()),
+        ResourceSlot.from(task).withOverhead(config),
         getTotalTaskResources(taskInfo));
   }
 
@@ -235,9 +236,18 @@ public class MesosTaskFactoryImplTest {
     assertTrue(taskInfo.getExecutor().getContainer().getVolumesList().contains(expected));
   }
 
-  private static Resources getTotalTaskResources(TaskInfo task) {
-    Resources taskResources = Resources.from(task.getResourcesList());
-    Resources executorResources = Resources.from(task.getExecutor().getResourcesList());
-    return ResourceSlot.sum(taskResources, executorResources);
+  private static ResourceSlot getTotalTaskResources(TaskInfo task) {
+    Resources taskResources = fromResourceList(task.getResourcesList());
+    Resources executorResources = fromResourceList(task.getExecutor().getResourcesList());
+    return taskResources.slot().add(executorResources.slot());
+  }
+
+  private static Resources fromResourceList(Iterable<Protos.Resource> resources) {
+    return Resources.from(Protos.Offer.newBuilder()
+        .setId(Protos.OfferID.newBuilder().setValue("ignored"))
+        .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("ignored"))
+        .setSlaveId(SlaveID.newBuilder().setValue("ignored"))
+        .setHostname("ignored")
+        .addAllResources(resources).build());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java b/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java
index e29829d..b48f32a 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.mesos;
 
 import com.twitter.common.collections.Pair;
 
-import org.apache.aurora.scheduler.Resources;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
@@ -26,6 +25,11 @@ import org.apache.mesos.Protos.Value.Ranges;
 import org.apache.mesos.Protos.Value.Scalar;
 import org.apache.mesos.Protos.Value.Type;
 
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
+
 public final class Offers {
   private Offers() {
     // Utility class.
@@ -43,13 +47,13 @@ public final class Offers {
         .build();
 
     return Offer.newBuilder()
-        .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.CPUS)
+        .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(CPUS.getName())
             .setScalar(Scalar.newBuilder().setValue(cpu)))
-        .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.RAM_MB)
+        .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(RAM_MB.getName())
             .setScalar(Scalar.newBuilder().setValue(ramMb)))
-        .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.DISK_MB)
+        .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(DISK_MB.getName())
             .setScalar(Scalar.newBuilder().setValue(diskMb)))
-        .addResources(Resource.newBuilder().setType(Type.RANGES).setName(Resources.PORTS)
+        .addResources(Resource.newBuilder().setType(Type.RANGES).setName(PORTS.getName())
             .setRanges(portRanges))
         .addAttributes(Protos.Attribute.newBuilder().setType(Type.TEXT)
             .setName("host")

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/mesos/TaskExecutors.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/TaskExecutors.java b/src/test/java/org/apache/aurora/scheduler/mesos/TaskExecutors.java
index 14820f2..6a80503 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/TaskExecutors.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/TaskExecutors.java
@@ -16,7 +16,7 @@ package org.apache.aurora.scheduler.mesos;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
-import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.ResourceSlot;
 
 /**
  * Utility class to contain constants related to setting up executor settings.
@@ -40,6 +40,6 @@ public final class TaskExecutors {
           .setExecutorPath(EXECUTOR_PATH)
           .setThermosObserverRoot("/var/run/thermos")
           .setExecutorOverhead(
-              new Resources(0.01, Amount.of(256L, Data.MB), Amount.of(0L, Data.MB), 0))
+              new ResourceSlot(0.01, Amount.of(256L, Data.MB), Amount.of(0L, Data.MB), 0))
           .build();
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index 4bae195..66f20c6 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -36,7 +36,7 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
@@ -412,7 +412,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
       Amount<Long, Data> disk,
       int numPorts) {
 
-    List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList();
+    List<Resource> resources = new ResourceSlot(cpu, ram, disk, numPorts).toResourceList();
     Offer.Builder builder = Offer.newBuilder();
     builder.getIdBuilder().setValue(offerId);
     builder.getFrameworkIdBuilder().setValue("framework-id");

http://git-wip-us.apache.org/repos/asf/aurora/blob/3676ec25/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 99bc80a..88958d1 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -27,7 +27,7 @@ import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -96,6 +96,10 @@ public class TaskAssignerImplTest extends EasyMockTest {
       .setSlaveId(MESOS_OFFER.getSlaveId())
       .build();
   private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
+  private static final UnusedResource UNUSED =
+      new UnusedResource(Resources.from(MESOS_OFFER).slot(), OFFER.getAttributes());
+  private static final ResourceRequest RESOURCE_REQUEST =
+      new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY);
 
   private MutableStoreProvider storeProvider;
   private StateManager stateManager;
@@ -118,10 +122,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   public void testAssignNoVetoes() throws Exception {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(filter.filter(
-        new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
-        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
-        .andReturn(ImmutableSet.of());
+    expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
     expect(stateManager.assignTask(
         storeProvider,
         Tasks.id(TASK),
@@ -146,9 +147,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   public void testAssignVetoesWithStaticBan() throws Exception {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
-    expect(filter.filter(
-        new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
-        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
+    expect(filter.filter(UNUSED, RESOURCE_REQUEST))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
 
     control.replay();
@@ -164,9 +163,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   @Test
   public void testAssignVetoesWithNoStaticBan() throws Exception {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    expect(filter.filter(
-        new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
-        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
+    expect(filter.filter(UNUSED, RESOURCE_REQUEST))
         .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
 
     control.replay();
@@ -184,10 +181,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
-    expect(filter.filter(
-        new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
-        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
-        .andReturn(ImmutableSet.of());
+    expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
     expect(stateManager.assignTask(
         storeProvider,
         Tasks.id(TASK),
@@ -250,10 +244,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         IHostAttributes.build(new HostAttributes()));
 
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER));
-    expect(filter.filter(
-        new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
-        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
-        .andReturn(ImmutableSet.of());
+    expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
     expect(stateManager.assignTask(
         storeProvider,
         Tasks.id(TASK),
@@ -294,12 +285,14 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
     expect(filter.filter(
-        new UnusedResource(ResourceSlot.from(mismatched.getOffer()), mismatched.getAttributes()),
+        new UnusedResource(
+            Resources.from(mismatched.getOffer()).slot(),
+            mismatched.getAttributes()),
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch")));
     offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY);
     expect(filter.filter(
-        new UnusedResource(ResourceSlot.from(OFFER.getOffer()), OFFER.getAttributes()),
+        new UnusedResource(Resources.from(OFFER.getOffer()).slot(), OFFER.getAttributes()),
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
         .andReturn(ImmutableSet.of());