You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/05/06 19:14:10 UTC

[1/2] aurora git commit: Replacing IResourceAggregate in resource calculations.

Repository: aurora
Updated Branches:
  refs/heads/master d702587d2 -> 3687c6a1a


http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index d591d0f..2e97a33 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -38,7 +38,7 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
-import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -54,10 +54,16 @@ import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.gen.Resource.diskMb;
+import static org.apache.aurora.gen.Resource.namedPort;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
 import static org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl.updateQuery;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceBag.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.aggregate;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
@@ -92,9 +98,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
     IScheduledTask prodDedicatedTask = prodDedicatedTask("foo2", 5, 5, 5);
     IScheduledTask nonProdSharedTask = nonProdTask("bar1", 2, 2, 2);
     IScheduledTask nonProdDedicatedTask = nonProdDedicatedTask("bar2", 7, 7, 7);
-    IResourceAggregate quota = aggregate(4, 4, 4);
 
-    expectQuota(quota);
+    expectQuota(aggregate(4, 4, 4));
     expectTasks(prodSharedTask, nonProdSharedTask, prodDedicatedTask, nonProdDedicatedTask);
     expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
     expectCronJobs(
@@ -104,12 +109,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     control.replay();
 
     assertEquals(
-        new QuotaInfo(
-            aggregate(4, 4, 4),
-            aggregate(6, 6, 6),
-            aggregate(5, 5, 5),
-            aggregate(9, 9, 9),
-            aggregate(7, 7, 7)),
+        new QuotaInfo(bag(4, 4, 4), bag(6, 6, 6), bag(5, 5, 5), bag(9, 9, 9), bag(7, 7, 7)),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -117,9 +117,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testGetQuotaInfoWithCronTasks() {
     IScheduledTask prodTask = prodTask("pc", 6, 6, 6);
     IScheduledTask nonProdTask = prodTask("npc", 7, 7, 7);
-    IResourceAggregate quota = aggregate(4, 4, 4);
 
-    expectQuota(quota);
+    expectQuota(aggregate(4, 4, 4));
     expectTasks(prodTask, nonProdTask);
     expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
 
@@ -142,7 +141,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     control.replay();
 
     assertEquals(
-        new QuotaInfo(aggregate(4, 4, 4), aggregate(7, 7, 7), EMPTY, aggregate(10, 10, 10), EMPTY),
+        new QuotaInfo(bag(4, 4, 4), bag(7, 7, 7), EMPTY, bag(10, 10, 10), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -152,9 +151,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
     IScheduledTask updatingProdTask = createTask(JOB_NAME, "id1", 3, 3, 3, true, 1);
     IScheduledTask updatingFilteredProdTask = createTask(JOB_NAME, "id0", 3, 3, 3, true, 0);
     IScheduledTask nonProdTask = createTask("bar", "id1", 2, 2, 2, false, 0);
-    IResourceAggregate quota = aggregate(4, 4, 4);
 
-    expectQuota(quota);
+    expectQuota(aggregate(4, 4, 4));
     expectTasks(prodTask, updatingProdTask, updatingFilteredProdTask, nonProdTask);
     expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
     expectNoCronJobs();
@@ -163,15 +161,13 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     // Expected consumption from: prodTask + updatingProdTask + job update.
     assertEquals(
-        new QuotaInfo(aggregate(4, 4, 4), aggregate(7, 7, 7), EMPTY, aggregate(2, 2, 2), EMPTY),
+        new QuotaInfo(bag(4, 4, 4), bag(7, 7, 7), EMPTY, bag(2, 2, 2), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
   @Test
   public void testGetQuotaInfoNoTasksNoUpdatesNoCronJobs() {
-    IResourceAggregate quota = aggregate(4, 4, 4);
-
-    expectQuota(quota);
+    expectQuota(aggregate(4, 4, 4));
     expectNoTasks();
     expectNoJobUpdates();
     expectNoCronJobs();
@@ -179,7 +175,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     control.replay();
 
     assertEquals(
-        new QuotaInfo(aggregate(4, 4, 4), aggregate(0, 0, 0), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(4, 4, 4), bag(0, 0, 0), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -332,7 +328,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     QuotaCheckResult checkQuota =
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 2, true), 1, storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
-    assertTrue(checkQuota.getDetails().get().contains("DISK"));
+    assertTrue(checkQuota.getDetails().get().contains(ResourceType.DISK_MB.getAuroraName()));
   }
 
   @Test
@@ -350,7 +346,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(2, 2, 2, true), 1, storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), aggregate(4, 4, 4), EMPTY, aggregate(7, 7, 7), EMPTY),
+        new QuotaInfo(bag(5, 5, 5), bag(4, 4, 4), EMPTY, bag(7, 7, 7), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -369,7 +365,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), aggregate(4, 4, 4), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(5, 5, 5), bag(4, 4, 4), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -387,7 +383,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), aggregate(4, 4, 4), EMPTY, aggregate(8, 8, 8), EMPTY),
+        new QuotaInfo(bag(5, 5, 5), bag(4, 4, 4), EMPTY, bag(8, 8, 8), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -405,7 +401,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), aggregate(4, 4, 4), EMPTY, aggregate(7, 7, 7), EMPTY),
+        new QuotaInfo(bag(5, 5, 5), bag(4, 4, 4), EMPTY, bag(7, 7, 7), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -423,7 +419,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), aggregate(5, 5, 5), EMPTY, aggregate(1, 1, 1), EMPTY),
+        new QuotaInfo(bag(5, 5, 5), bag(5, 5, 5), EMPTY, bag(1, 1, 1), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -440,7 +436,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(6, 6, 6), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -457,7 +453,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(6, 6, 6), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -474,7 +470,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(6, 6, 6), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -503,7 +499,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(4, 4, 4), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(4, 4, 4), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -532,7 +528,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(4, 4, 4), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(4, 4, 4), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -561,7 +557,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1, storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(6, 6, 6), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -589,7 +585,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(6, 6, 6), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -614,7 +610,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(4, 4, 4), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(4, 4, 4), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -642,7 +638,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(6, 6, 6), aggregate(6, 6, 6), EMPTY, aggregate(0, 0, 0), EMPTY),
+        new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -721,7 +717,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectNoTasks();
     expectQuota(aggregate(1, 1, 1));
 
-    storageUtil.quotaStore.saveQuota(ROLE, EMPTY);
+    storageUtil.quotaStore.saveQuota(ROLE, aggregate(0, 0, 0));
 
     control.replay();
     quotaManager.saveQuota(ROLE, aggregate(0, 0, 0), storageUtil.mutableStoreProvider);
@@ -762,7 +758,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkCronUpdate(createJob(prodTask("pc", 1, 1, 1), 2), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), aggregate(4, 4, 4), EMPTY, aggregate(7, 7, 7), EMPTY),
+        new QuotaInfo(bag(5, 5, 5), bag(4, 4, 4), EMPTY, bag(7, 7, 7), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -782,7 +778,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkCronUpdate(createJob(prodTask("pc", 5, 5, 5), 1), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), aggregate(4, 4, 4), EMPTY, aggregate(7, 7, 7), EMPTY),
+        new QuotaInfo(bag(5, 5, 5), bag(4, 4, 4), EMPTY, bag(7, 7, 7), EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -802,7 +798,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkCronUpdate(createJob(prodTask("pc", 5, 5, 5), 2), storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), aggregate(4, 4, 4), EMPTY, EMPTY, EMPTY),
+        new QuotaInfo(bag(5, 5, 5), bag(4, 4, 4), EMPTY, EMPTY, EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -820,7 +816,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         quotaManager.checkCronUpdate(createJob(prodTask("pc", 5, 5, 5), 1), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
-        new QuotaInfo(aggregate(5, 5, 5), EMPTY, EMPTY, EMPTY, EMPTY),
+        new QuotaInfo(bag(5, 5, 5), EMPTY, EMPTY, EMPTY, EMPTY),
         quotaManager.getQuotaInfo(ROLE, storeProvider));
   }
 
@@ -968,9 +964,12 @@ public class QuotaManagerImplTest extends EasyMockTest {
     ScheduledTask builder = TaskTestUtil.makeTask(taskId, JobKeys.from(ROLE, ENV, jobName))
         .newBuilder();
     builder.getAssignedTask().setInstanceId(instanceId);
-    builder.getAssignedTask().getTask().setNumCpus(cpus)
+    builder.getAssignedTask().getTask()
+        .setNumCpus(cpus)
         .setRamMb(ramMb)
         .setDiskMb(diskMb)
+        .setRequestedPorts(ImmutableSet.of("a"))
+        .setResources(ImmutableSet.of(numCpus(cpus), ramMb(ramMb), diskMb(diskMb), namedPort("a")))
         .setProduction(production);
     return IScheduledTask.build(builder);
   }
@@ -982,8 +981,4 @@ public class QuotaManagerImplTest extends EasyMockTest {
         .setTaskConfig(task)
         .setInstanceCount(instanceCount));
   }
-
-  private static IResourceAggregate aggregate(double numCpus, long ramMb, long diskMb) {
-    return ResourceTestUtil.nonBackfilledAggregate(numCpus, ramMb, diskMb);
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java
new file mode 100644
index 0000000..48724d5
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.resources.ResourceBag.LARGE;
+import static org.apache.aurora.scheduler.resources.ResourceBag.MEDIUM;
+import static org.apache.aurora.scheduler.resources.ResourceBag.SMALL;
+import static org.apache.aurora.scheduler.resources.ResourceBag.XLARGE;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static org.junit.Assert.assertEquals;
+
+public class ResourceBagTest {
+  @Test
+  public void testAdd() {
+    assertEquals(LARGE, MEDIUM.add(MEDIUM));
+  }
+
+  @Test
+  public void testSubtract() {
+    assertEquals(MEDIUM, LARGE.subtract(MEDIUM));
+  }
+
+  @Test
+  public void testDivide() {
+    assertEquals(bag(16.0, 32, 16), XLARGE.divide(SMALL));
+  }
+
+  @Test
+  public void testMax() {
+    assertEquals(bag(2.0, 32, 256), bag(1.0, 32, 128).max(bag(2.0, 16, 256)));
+  }
+
+  @Test
+  public void testScale() {
+    assertEquals(bag(8.0, 128, 1024), bag(2.0, 32, 256).scale(4));
+  }
+
+  @Test
+  public void testKeyMismatch() {
+    assertEquals(
+        new ResourceBag(ImmutableMap.of(CPUS, 9.0)),
+        new ResourceBag(ImmutableMap.of(CPUS, 1.0)).add(LARGE));
+
+    assertEquals(
+        LARGE.add(new ResourceBag(ImmutableMap.of(CPUS, 1.0))),
+        new ResourceBag(ImmutableMap.of(CPUS, 9.0, RAM_MB, 16384.0, DISK_MB, 32768.0)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
index 914e553..333db30 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.resources;
 import java.util.EnumSet;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
@@ -27,10 +28,14 @@ import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.Value.Scalar;
 import org.junit.Test;
 
+import static org.apache.aurora.gen.Resource.diskMb;
 import static org.apache.aurora.gen.Resource.namedPort;
 import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.gen.Resource.ramMb;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.aggregate;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
 import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
@@ -51,6 +56,13 @@ public class ResourceManagerTest {
 
     Protos.Resource resource2 = Protos.Resource.newBuilder()
         .setType(SCALAR)
+        .setName(CPUS.getMesosName())
+        .setRevocable(Protos.Resource.RevocableInfo.getDefaultInstance())
+        .setScalar(Scalar.newBuilder().setValue(1.0).build())
+        .build();
+
+    Protos.Resource resource3 = Protos.Resource.newBuilder()
+        .setType(SCALAR)
         .setName(RAM_MB.getMesosName())
         .setScalar(Scalar.newBuilder().setValue(64).build())
         .build();
@@ -60,14 +72,20 @@ public class ResourceManagerTest {
         .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
         .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
         .setHostname("hostname")
-        .addAllResources(ImmutableSet.of(resource1, resource2)).build();
+        .addAllResources(ImmutableSet.of(resource1, resource2, resource3)).build();
 
     assertEquals(
-        resource1,
-        Iterables.getOnlyElement(ResourceManager.getOfferResources(offer, CPUS)));
+        ImmutableSet.of(resource1, resource2),
+        ImmutableSet.copyOf(ResourceManager.getOfferResources(offer, CPUS)));
     assertEquals(
-        resource2,
+        resource3,
         Iterables.getOnlyElement(ResourceManager.getOfferResources(offer, RAM_MB)));
+    assertEquals(
+        ImmutableSet.of(resource1, resource3),
+        ImmutableSet.copyOf(ResourceManager.getNonRevocableOfferResources(offer)));
+    assertEquals(
+        ImmutableSet.of(resource2, resource3),
+        ImmutableSet.copyOf(ResourceManager.getRevocableOfferResources(offer)));
   }
 
   @Test
@@ -78,6 +96,11 @@ public class ResourceManagerTest {
     assertEquals(
         IResource.build(namedPort("http")),
         Iterables.getOnlyElement(ResourceManager.getTaskResources(makeTask("id", JOB), PORTS)));
+    assertEquals(
+        ImmutableSet.of(IResource.build(numCpus(1.0)), IResource.build(ramMb(1024))),
+        ImmutableSet.copyOf(ResourceManager.getTaskResources(
+            makeTask("id", JOB).getAssignedTask().getTask(),
+            EnumSet.of(CPUS, RAM_MB))));
   }
 
   @Test
@@ -98,9 +121,46 @@ public class ResourceManagerTest {
         mesosScalar(RAM_MB, 64),
         mesosRange(PORTS, 1, 3));
 
-    assertEquals(7.0, ResourceManager.quantityOf(resources, CPUS), 0.0);
-    assertEquals(64, ResourceManager.quantityOf(resources, RAM_MB), 0.0);
-    assertEquals(0.0, ResourceManager.quantityOf(resources, DISK_MB), 0.0);
-    assertEquals(2, ResourceManager.quantityOf(resources, PORTS), 0.0);
+    assertEquals(7.0, ResourceManager.quantityOfMesosResource(resources, CPUS), 0.0);
+    assertEquals(64, ResourceManager.quantityOfMesosResource(resources, RAM_MB), 0.0);
+    assertEquals(0.0, ResourceManager.quantityOfMesosResource(resources, DISK_MB), 0.0);
+    assertEquals(2, ResourceManager.quantityOfMesosResource(resources, PORTS), 0.0);
+  }
+
+  @Test
+  public void testResourceQuantity() {
+    assertEquals(
+        8.0,
+        ResourceManager.quantityOf(ImmutableSet.of(
+            IResource.build(numCpus(3.0)),
+            IResource.build(numCpus(5.0)))),
+        0.0);
+  }
+
+  @Test
+  public void testBagFromResources() {
+    assertEquals(
+        bag(2.0, 32, 64),
+        ResourceManager.bagFromResources(ImmutableSet.of(
+            IResource.build(numCpus(2.0)),
+            IResource.build(ramMb(32)),
+            IResource.build(diskMb(64)))));
+  }
+
+  @Test
+  public void testBagFromMesosResources() {
+    assertEquals(
+        new ResourceBag(ImmutableMap.of(CPUS, 3.0)),
+        ResourceManager.bagFromMesosResources(ImmutableSet.of(mesosScalar(CPUS, 3.0))));
+  }
+
+  @Test
+  public void testBagFromAggregate() {
+    assertEquals(bag(1.0, 32, 64), ResourceManager.bagFromAggregate(aggregate(1.0, 32, 64)));
+  }
+
+  @Test
+  public void testAggregateFromBag() {
+    assertEquals(aggregate(1.0, 1024, 4096), ResourceManager.aggregateFromBag(ResourceBag.SMALL));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java
index 0e6a5ac..842572c 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java
@@ -56,18 +56,6 @@ public class ResourceSlotTest {
       .setRequestedPorts(ImmutableSet.of("http", "debug")));
 
   @Test
-  public void testMaxElements() {
-    ResourceSlot highRAM = new ResourceSlot(1, Amount.of(8L, Data.GB), Amount.of(10L, Data.MB), 0);
-    ResourceSlot rest = new ResourceSlot(10, Amount.of(1L, Data.MB), Amount.of(10L, Data.GB), 1);
-
-    ResourceSlot result = ResourceSlot.maxElements(highRAM, rest);
-    assertEquals(result.getNumCpus(), 10, 0.001);
-    assertEquals(result.getRam(), Amount.of(8L, Data.GB));
-    assertEquals(result.getDisk(), Amount.of(10L, Data.GB));
-    assertEquals(result.getNumPorts(), 1);
-  }
-
-  @Test
   public void testSubtract() {
     assertEquals(ONE, TWO.subtract(ONE));
     assertEquals(TWO, THREE.subtract(ONE));

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
index 821c47f..e0cca4b 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.resources;
 
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.base.Optional;
@@ -44,6 +45,14 @@ public final class ResourceTestUtil {
     // Utility class.
   }
 
+  public static ResourceBag bag(Map<ResourceType, Double> resources) {
+    return new ResourceBag(resources);
+  }
+
+  public static ResourceBag bag(double numCpus, long ramMb, long diskMb) {
+    return ResourceManager.bagFromAggregate(aggregate(numCpus, ramMb, diskMb));
+  }
+
   public static IResourceAggregate aggregate(double numCpus, long ramMb, long diskMb) {
     return IResourceAggregate.build(new ResourceAggregate(numCpus, ramMb, diskMb, ImmutableSet.of(
         numCpus(numCpus),
@@ -52,13 +61,6 @@ public final class ResourceTestUtil {
     )));
   }
 
-  public static IResourceAggregate nonBackfilledAggregate(double numCpus, long ramMb, long diskMb) {
-    return IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(numCpus)
-        .setRamMb(ramMb)
-        .setDiskMb(diskMb));
-  }
-
   public static ITaskConfig resetPorts(ITaskConfig config, Set<String> portNames) {
     TaskConfig builder = config.newBuilder()
         .setRequestedPorts(portNames);

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
index 185338e..716769f 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
@@ -55,7 +55,7 @@ public class ResourcesTest {
 
     assertEquals(
         new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
-        Resources.from(offer).filter(Resources.REVOCABLE).slot());
+        Resources.from(offer).filter(ResourceManager.REVOCABLE).slot());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
index 7fcf47a..75bac6f 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
@@ -17,16 +17,18 @@ import com.google.common.collect.ImmutableList;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.mesos.Protos;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
 import static org.apache.aurora.scheduler.stats.AsyncStatsModule.OfferAdapter;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
@@ -41,8 +43,10 @@ public class AsyncStatsModuleTest extends EasyMockTest {
             .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("frameworkId"))
             .setSlaveId(Protos.SlaveID.newBuilder().setValue("slaveId"))
             .setHostname("hostName")
-            .addResources(getCpuResource(true, 2.0))
-            .addResources(getCpuResource(false, 4.0))
+            .addResources(mesosScalar(CPUS, 2.0, true))
+            .addResources(mesosScalar(CPUS, 4.0, false))
+            .addResources(mesosScalar(RAM_MB, 1024))
+            .addResources(mesosScalar(DISK_MB, 2048))
             .build(),
             IHostAttributes.build(new HostAttributes()))));
 
@@ -55,21 +59,8 @@ public class AsyncStatsModuleTest extends EasyMockTest {
 
   private static MachineResource resource(boolean revocable, double cpu) {
     return new MachineResource(
-        IResourceAggregate.build(new ResourceAggregate().setNumCpus(cpu)),
+        ResourceTestUtil.bag(cpu, 1024, 2048),
         false,
         revocable);
   }
-
-  private static Protos.Resource getCpuResource(boolean revocable, double value) {
-    Protos.Resource.Builder builder = Protos.Resource.newBuilder()
-        .setName(ResourceType.CPUS.getMesosName())
-        .setType(Protos.Value.Type.SCALAR)
-        .setScalar(Protos.Value.Scalar.newBuilder().setValue(value));
-
-    if (revocable) {
-      builder.setRevocable(Protos.Resource.RevocableInfo.newBuilder());
-    }
-
-    return builder.build();
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
index b1c8f75..e529e67 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
@@ -21,23 +21,25 @@ import com.google.common.collect.ImmutableMap;
 
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.scheduler.resources.ResourceAggregates;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.aggregate;
+import static org.apache.aurora.scheduler.resources.ResourceBag.SMALL;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class SlotSizeCounterTest extends EasyMockTest {
+  private static final ResourceBag LARGE = SMALL.scale(4);
 
-  private static final IResourceAggregate SMALL = aggregate(1.0, 1024, 4096);
-  private static final IResourceAggregate LARGE = ResourceAggregates.scale(SMALL, 4);
-
-  private static final Map<String, IResourceAggregate> SLOT_SIZES = ImmutableMap.of(
+  private static final Map<String, ResourceBag> SLOT_SIZES = ImmutableMap.of(
       "small", SMALL,
       "large", LARGE);
 
@@ -105,7 +107,7 @@ public class SlotSizeCounterTest extends EasyMockTest {
   @Test
   public void testTinyOffers() {
     expectStatExport();
-    expectGetSlots(new MachineResource(aggregate(0.1, 1, 1), false, false));
+    expectGetSlots(new MachineResource(bag(0.1, 1, 1), false, false));
 
     control.replay();
 
@@ -124,7 +126,7 @@ public class SlotSizeCounterTest extends EasyMockTest {
   public void testStarvedResourceVector() {
     expectStatExport();
     expectGetSlots(
-        new MachineResource(aggregate(1000, 16384, 1), false, false));
+        new MachineResource(bag(1000, 16384, 1), false, false));
 
     control.replay();
 
@@ -148,11 +150,11 @@ public class SlotSizeCounterTest extends EasyMockTest {
         new MachineResource(LARGE, false, false),
         new MachineResource(LARGE, false, true),
         new MachineResource(LARGE, true, true),
-        new MachineResource(ResourceAggregates.scale(LARGE, 4), false, false),
-        new MachineResource(aggregate(1, 1, 1), false, false),
+        new MachineResource(LARGE.scale(4), false, false),
+        new MachineResource(bag(1, 1, 1), false, false),
         new MachineResource(SMALL, true, false),
         new MachineResource(SMALL, true, false),
-        new MachineResource(ResourceAggregates.scale(SMALL, 2), true, false));
+        new MachineResource(SMALL.scale(2), true, false));
 
     control.replay();
 
@@ -166,4 +168,10 @@ public class SlotSizeCounterTest extends EasyMockTest {
     assertEquals(1, largeRevocableCounter.get());
     assertEquals(1, largeDedicatedRevocableCounter.get());
   }
+
+  private static ResourceBag bag(double cpus, double ram, double disk) {
+    // Add default port count to simulate actual machine resources.
+    return ResourceTestUtil.bag(
+        ImmutableMap.of(CPUS, cpus, RAM_MB, ram, DISK_MB, disk, PORTS, 3.0));
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
index aeab07d..e0cf602 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -51,7 +51,7 @@ import org.apache.aurora.gen.storage.StoredCronJob;
 import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.resources.ResourceAggregates;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.db.MigrationManager;
@@ -69,6 +69,7 @@ import org.junit.Test;
 
 import static org.apache.aurora.common.inject.Bindings.KeyFactory.PLAIN;
 import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
+import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag;
 import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import static org.apache.aurora.scheduler.storage.db.DbModule.testModuleWithWorkQueue;
 import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorage;
@@ -194,7 +195,7 @@ public class SnapshotStoreImplIT {
       .setTaskConfig(TASK_CONFIG.newBuilder()));
   private static final String ROLE = "role";
   private static final IResourceAggregate QUOTA =
-      ThriftBackfill.backfillResourceAggregate(ResourceAggregates.LARGE.newBuilder());
+      ThriftBackfill.backfillResourceAggregate(aggregateFromBag(ResourceBag.LARGE).newBuilder());
   private static final IHostAttributes ATTRIBUTES = IHostAttributes.build(
       new HostAttributes("host", ImmutableSet.of(new Attribute("attr", ImmutableSet.of("value"))))
           .setMode(MaintenanceMode.NONE)

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java b/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
index a2e2395..4f81585 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
@@ -45,11 +45,11 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IResult;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -74,7 +74,7 @@ final class Fixtures {
   private static final Function<String, ResponseDetail> MESSAGE_TO_DETAIL =
       message -> new ResponseDetail().setMessage(message);
   static final String CRON_SCHEDULE = "0 * * * *";
-  static final IResourceAggregate QUOTA = ResourceTestUtil.aggregate(10.0, 1024, 2048);
+  static final ResourceBag QUOTA = ResourceTestUtil.bag(10.0, 1024, 2048);
   static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
   static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
   static final InstanceKey INSTANCE_KEY = new InstanceKey(JOB_KEY.newBuilder(), 0);

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java
index 3a2b3f3..2122f74 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImplTest.java
@@ -91,10 +91,11 @@ import org.junit.Test;
 import static org.apache.aurora.gen.ResponseCode.INVALID_REQUEST;
 import static org.apache.aurora.scheduler.base.Numbers.convertRanges;
 import static org.apache.aurora.scheduler.base.Numbers.toRanges;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.LARGE;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.MEDIUM;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.SMALL;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.XLARGE;
+import static org.apache.aurora.scheduler.resources.ResourceBag.LARGE;
+import static org.apache.aurora.scheduler.resources.ResourceBag.MEDIUM;
+import static org.apache.aurora.scheduler.resources.ResourceBag.SMALL;
+import static org.apache.aurora.scheduler.resources.ResourceBag.XLARGE;
+import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag;
 import static org.apache.aurora.scheduler.thrift.Fixtures.CRON_JOB;
 import static org.apache.aurora.scheduler.thrift.Fixtures.CRON_SCHEDULE;
 import static org.apache.aurora.scheduler.thrift.Fixtures.IDENTITY;
@@ -323,11 +324,11 @@ public class ReadOnlySchedulerImplTest extends EasyMockTest {
     control.replay();
 
     GetQuotaResult expected = new GetQuotaResult()
-        .setQuota(QUOTA.newBuilder())
-        .setProdSharedConsumption(XLARGE.newBuilder())
-        .setProdDedicatedConsumption(LARGE.newBuilder())
-        .setNonProdSharedConsumption(MEDIUM.newBuilder())
-        .setNonProdDedicatedConsumption(SMALL.newBuilder());
+        .setQuota(aggregateFromBag(QUOTA).newBuilder())
+        .setProdSharedConsumption(aggregateFromBag(XLARGE).newBuilder())
+        .setProdDedicatedConsumption(aggregateFromBag(LARGE).newBuilder())
+        .setNonProdSharedConsumption(aggregateFromBag(MEDIUM).newBuilder())
+        .setNonProdDedicatedConsumption(aggregateFromBag(SMALL).newBuilder());
 
     Response response = assertOkResponse(thrift.getQuota(ROLE));
     assertEquals(expected, response.getResult().getGetQuotaResult());


[2/2] aurora git commit: Replacing IResourceAggregate in resource calculations.

Posted by ma...@apache.org.
Replacing IResourceAggregate in resource calculations.

Reviewed at https://reviews.apache.org/r/46997/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3687c6a1
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3687c6a1
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3687c6a1

Branch: refs/heads/master
Commit: 3687c6a1a9961433eb254f8e25127d028d9003f8
Parents: d702587
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri May 6 12:13:53 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri May 6 12:13:53 2016 -0700

----------------------------------------------------------------------
 .../preemptor/PreemptionVictimFilter.java       |   3 +-
 .../scheduler/quota/QuotaCheckResult.java       |  57 ++----
 .../aurora/scheduler/quota/QuotaInfo.java       |  32 ++--
 .../aurora/scheduler/quota/QuotaManager.java    | 123 ++++++-------
 .../scheduler/resources/AcceptedOffer.java      |   4 +-
 .../resources/AuroraResourceConverter.java      |  46 +++++
 .../scheduler/resources/ResourceAggregates.java |  88 ---------
 .../aurora/scheduler/resources/ResourceBag.java | 178 +++++++++++++++++++
 .../scheduler/resources/ResourceManager.java    | 151 +++++++++++++++-
 .../scheduler/resources/ResourceSlot.java       |  22 ---
 .../scheduler/resources/ResourceType.java       |  69 ++++++-
 .../aurora/scheduler/resources/Resources.java   |  30 +---
 .../apache/aurora/scheduler/sla/SlaGroup.java   |  59 +++---
 .../scheduler/stats/AsyncStatsModule.java       |  40 ++---
 .../aurora/scheduler/stats/SlotSizeCounter.java |  45 +++--
 .../scheduler/storage/log/ThriftBackfill.java   |   3 +-
 .../scheduler/thrift/ReadOnlySchedulerImpl.java |  17 +-
 .../scheduler/quota/QuotaCheckResultTest.java   |  55 ++----
 .../scheduler/quota/QuotaManagerImplTest.java   |  85 +++++----
 .../scheduler/resources/ResourceBagTest.java    |  66 +++++++
 .../resources/ResourceManagerTest.java          |  76 +++++++-
 .../scheduler/resources/ResourceSlotTest.java   |  12 --
 .../scheduler/resources/ResourceTestUtil.java   |  16 +-
 .../scheduler/resources/ResourcesTest.java      |   2 +-
 .../scheduler/stats/AsyncStatsModuleTest.java   |  29 ++-
 .../scheduler/stats/SlotSizeCounterTest.java    |  32 ++--
 .../storage/log/SnapshotStoreImplIT.java        |   5 +-
 .../aurora/scheduler/thrift/Fixtures.java       |   4 +-
 .../thrift/ReadOnlySchedulerImplTest.java       |  19 +-
 29 files changed, 869 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index 9a37ee7..032ab2d 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -35,6 +35,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceSlot;
 import org.apache.aurora.scheduler.resources.Resources;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -98,7 +99,7 @@ public interface PreemptionVictimFilter {
     }
 
     private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
-        offer -> Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot();
+        offer -> Resources.from(offer.getOffer()).filter(ResourceManager.NON_REVOCABLE).slot();
 
     private static final Function<HostOffer, String> OFFER_TO_HOST =
         offer -> offer.getOffer().getHostname();

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
index 3437c65..99f034f 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
@@ -16,10 +16,13 @@ package org.apache.aurora.scheduler.quota;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceType;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.resources.ResourceBag.IS_NEGATIVE;
+
 /**
  * Calculates and formats detailed quota comparison result.
  */
@@ -40,21 +43,6 @@ public class QuotaCheckResult {
     INSUFFICIENT_QUOTA
   }
 
-  enum Resource {
-    CPU("core(s)"),
-    RAM("MB"),
-    DISK("MB");
-
-    private final String unit;
-    Resource(String unit) {
-      this.unit = unit;
-    }
-
-    String getUnit() {
-      return unit;
-    }
-  }
-
   private final Optional<String> details;
   private final Result result;
 
@@ -86,34 +74,25 @@ public class QuotaCheckResult {
     return details;
   }
 
-  static QuotaCheckResult greaterOrEqual(IResourceAggregate a, IResourceAggregate b) {
+  static QuotaCheckResult greaterOrEqual(ResourceBag a, ResourceBag b) {
     StringBuilder details = new StringBuilder();
-    boolean result = compare(a.getNumCpus(), b.getNumCpus(), Resource.CPU, details)
-        & compare(a.getRamMb(), b.getRamMb(), Resource.RAM, details)
-        & compare(a.getDiskMb(), b.getDiskMb(), Resource.DISK, details);
+    ResourceBag difference = a.subtract(b);
+    difference.getResourceVectors().entrySet().stream()
+        .filter(IS_NEGATIVE)
+        .forEach(entry -> addMessage(entry.getKey(), Math.abs(entry.getValue()), details));
 
     return new QuotaCheckResult(
-        result ? Result.SUFFICIENT_QUOTA : Result.INSUFFICIENT_QUOTA,
+        details.length() > 0 ? Result.INSUFFICIENT_QUOTA : Result.SUFFICIENT_QUOTA,
         Optional.of(details.toString()));
   }
 
-  private static boolean compare(
-      double a,
-      double b,
-      Resource resource,
-      StringBuilder details) {
-
-    boolean result = a >= b;
-    if (!result) {
-      details
-          .append(details.length() > 0 ? "; " : "")
-          .append(resource)
-          .append(" quota exceeded by ")
-          .append(String.format("%.2f", b - a))
-          .append(" ")
-          .append(resource.getUnit());
-    }
-
-    return result;
+  private static void addMessage(ResourceType resourceType, Double overage, StringBuilder details) {
+    details
+        .append(details.length() > 0 ? "; " : "")
+        .append(resourceType.getAuroraName())
+        .append(" quota exceeded by ")
+        .append(String.format("%.2f", overage))
+        .append(" ")
+        .append(resourceType.getAuroraUnit());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
index 1df21b8..6990351 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
@@ -17,7 +17,7 @@ import java.util.Objects;
 
 import com.google.common.base.MoreObjects;
 
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 
 import static java.util.Objects.requireNonNull;
 
@@ -25,18 +25,18 @@ import static java.util.Objects.requireNonNull;
  * Wraps allocated quota and consumption details.
  */
 public class QuotaInfo {
-  private final IResourceAggregate quota;
-  private final IResourceAggregate prodSharedConsumption;
-  private final IResourceAggregate prodDedicatedConsumption;
-  private final IResourceAggregate nonProdSharedConsumption;
-  private final IResourceAggregate nonProdDedicatedConsumption;
+  private final ResourceBag quota;
+  private final ResourceBag prodSharedConsumption;
+  private final ResourceBag prodDedicatedConsumption;
+  private final ResourceBag nonProdSharedConsumption;
+  private final ResourceBag nonProdDedicatedConsumption;
 
   QuotaInfo(
-      IResourceAggregate quota,
-      IResourceAggregate prodSharedConsumption,
-      IResourceAggregate prodDedicatedConsumption,
-      IResourceAggregate nonProdSharedConsumption,
-      IResourceAggregate nonProdDedicatedConsumption) {
+      ResourceBag quota,
+      ResourceBag prodSharedConsumption,
+      ResourceBag prodDedicatedConsumption,
+      ResourceBag nonProdSharedConsumption,
+      ResourceBag nonProdDedicatedConsumption) {
 
     this.quota = requireNonNull(quota);
     this.prodSharedConsumption = requireNonNull(prodSharedConsumption);
@@ -50,7 +50,7 @@ public class QuotaInfo {
    *
    * @return Available quota.
    */
-  public IResourceAggregate getQuota() {
+  public ResourceBag getQuota() {
     return quota;
   }
 
@@ -59,7 +59,7 @@ public class QuotaInfo {
    *
    * @return Production job consumption.
    */
-  public IResourceAggregate getProdSharedConsumption() {
+  public ResourceBag getProdSharedConsumption() {
     return prodSharedConsumption;
   }
 
@@ -68,7 +68,7 @@ public class QuotaInfo {
    *
    * @return Production dedicated job consumption.
    */
-  public IResourceAggregate getProdDedicatedConsumption() {
+  public ResourceBag getProdDedicatedConsumption() {
     return prodDedicatedConsumption;
   }
 
@@ -77,7 +77,7 @@ public class QuotaInfo {
    *
    * @return Non production job consumption.
    */
-  public IResourceAggregate getNonProdSharedConsumption() {
+  public ResourceBag getNonProdSharedConsumption() {
     return nonProdSharedConsumption;
   }
 
@@ -86,7 +86,7 @@ public class QuotaInfo {
    *
    * @return Non production dedicated job consumption.
    */
-  public IResourceAggregate getNonProdDedicatedConsumption() {
+  public ResourceBag getNonProdDedicatedConsumption() {
     return nonProdDedicatedConsumption;
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index bf476aa..6d0d120 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -13,9 +13,10 @@
  */
 package org.apache.aurora.scheduler.quota;
 
-import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.StreamSupport;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -30,12 +31,13 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.RangeSet;
 
 import org.apache.aurora.gen.JobUpdateQuery;
-import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.resources.ResourceAggregates;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -63,7 +65,14 @@ import static com.google.common.base.Predicates.not;
 import static com.google.common.base.Predicates.or;
 
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceBag.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceBag.IS_NEGATIVE;
+import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromAggregate;
+import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
+import static org.apache.aurora.scheduler.resources.ResourceManager.getTaskResources;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
 import static org.apache.aurora.scheduler.updater.Updates.getInstanceIds;
 
 /**
@@ -78,6 +87,8 @@ public interface QuotaManager {
   Predicate<ITaskConfig> NON_PROD_SHARED = and(not(PROD), not(DEDICATED));
   Predicate<ITaskConfig> NON_PROD_DEDICATED = and(not(PROD), DEDICATED);
 
+  EnumSet<ResourceType> QUOTA_RESOURCE_TYPES = EnumSet.of(CPUS, RAM_MB, DISK_MB);
+
   /**
    * Saves a new quota for the provided role or overrides the existing one.
    *
@@ -161,10 +172,9 @@ public interface QuotaManager {
       }
 
       QuotaInfo info = getQuotaInfo(ownerRole, Optional.absent(), storeProvider);
-      IResourceAggregate prodConsumption = info.getProdSharedConsumption();
-      if (quota.getNumCpus() < prodConsumption.getNumCpus()
-          || quota.getRamMb() < prodConsumption.getRamMb()
-          || quota.getDiskMb() < prodConsumption.getDiskMb()) {
+      ResourceBag prodConsumption = info.getProdSharedConsumption();
+      ResourceBag overage = bagFromAggregate(quota).subtract(prodConsumption);
+      if (overage.getResourceVectors().entrySet().stream().anyMatch(IS_NEGATIVE)) {
         throw new QuotaException(String.format(
             "Quota: %s is less then current prod reservation: %s",
             quota.toString(),
@@ -191,8 +201,8 @@ public interface QuotaManager {
       }
 
       QuotaInfo quotaInfo = getQuotaInfo(template.getJob().getRole(), storeProvider);
-      IResourceAggregate requestedTotal =
-          add(quotaInfo.getProdSharedConsumption(), scale(template, instances));
+      ResourceBag requestedTotal =
+          quotaInfo.getProdSharedConsumption().add(scale(template, instances));
 
       return QuotaCheckResult.greaterOrEqual(quotaInfo.getQuota(), requestedTotal);
     }
@@ -231,13 +241,12 @@ public interface QuotaManager {
       Optional<IJobConfiguration> oldCron =
           storeProvider.getCronJobStore().fetchJob(cronConfig.getKey());
 
-      IResourceAggregate oldResource = oldCron.isPresent() ? scale(oldCron.get()) : EMPTY;
+      ResourceBag oldResource = oldCron.isPresent() ? scale(oldCron.get()) : EMPTY;
 
       // Calculate requested total as a sum of current prod consumption and a delta between
       // new and old cron templates.
-      IResourceAggregate requestedTotal = add(
-          quotaInfo.getProdSharedConsumption(),
-          subtract(scale(cronConfig), oldResource));
+      ResourceBag requestedTotal =
+          quotaInfo.getProdSharedConsumption().add(scale(cronConfig).subtract(oldResource));
 
       return QuotaCheckResult.greaterOrEqual(quotaInfo.getQuota(), requestedTotal);
     }
@@ -280,14 +289,16 @@ public interface QuotaManager {
               .uniqueIndex(IJobConfiguration::getKey);
 
       return new QuotaInfo(
-          storeProvider.getQuotaStore().fetchQuota(role).or(EMPTY),
+          storeProvider.getQuotaStore().fetchQuota(role)
+              .transform(ResourceManager::bagFromAggregate)
+              .or(EMPTY),
           getConsumption(tasks, updates, cronTemplates, PROD_SHARED),
           getConsumption(tasks, updates, cronTemplates, PROD_DEDICATED),
           getConsumption(tasks, updates, cronTemplates, NON_PROD_SHARED),
           getConsumption(tasks, updates, cronTemplates, NON_PROD_DEDICATED));
     }
 
-    private IResourceAggregate getConsumption(
+    private ResourceBag getConsumption(
         FluentIterable<IAssignedTask> tasks,
         Map<IJobKey, IJobUpdateInstructions> updatesByKey,
         Map<IJobKey, IJobConfiguration> cronTemplatesByKey,
@@ -300,21 +311,21 @@ public interface QuotaManager {
           not(in(cronTemplatesByKey.keySet())),
           Tasks::getJob);
 
-      IResourceAggregate nonCronConsumption = getNonCronConsumption(
+      ResourceBag nonCronConsumption = getNonCronConsumption(
           updatesByKey,
           filteredTasks.filter(excludeCron),
           filter);
 
-      IResourceAggregate cronConsumption = getCronConsumption(
+      ResourceBag cronConsumption = getCronConsumption(
           Iterables.filter(
               cronTemplatesByKey.values(),
               compose(filter, IJobConfiguration::getTaskConfig)),
           filteredTasks.transform(IAssignedTask::getTask));
 
-      return add(nonCronConsumption, cronConsumption);
+      return nonCronConsumption.add(cronConsumption);
     }
 
-    private static IResourceAggregate getNonCronConsumption(
+    private static ResourceBag getNonCronConsumption(
         Map<IJobKey, IJobUpdateInstructions> updatesByKey,
         FluentIterable<IAssignedTask> tasks,
         final Predicate<ITaskConfig> configFilter) {
@@ -330,20 +341,20 @@ public interface QuotaManager {
       //
       // 3. Add up the two to yield total consumption.
 
-      IResourceAggregate nonUpdateConsumption = fromTasks(tasks
+      ResourceBag nonUpdateConsumption = fromTasks(tasks
           .filter(buildNonUpdatingTasksFilter(updatesByKey))
           .transform(IAssignedTask::getTask));
 
       final Predicate<IInstanceTaskConfig> instanceFilter =
           compose(configFilter, IInstanceTaskConfig::getTask);
 
-      IResourceAggregate updateConsumption =
+      ResourceBag updateConsumption =
           addAll(Iterables.transform(updatesByKey.values(), updateResources(instanceFilter)));
 
-      return add(nonUpdateConsumption, updateConsumption);
+      return nonUpdateConsumption.add(updateConsumption);
     }
 
-    private static IResourceAggregate getCronConsumption(
+    private static ResourceBag getCronConsumption(
         Iterable<IJobConfiguration> cronTemplates,
         FluentIterable<ITaskConfig> tasks) {
 
@@ -358,9 +369,9 @@ public interface QuotaManager {
       final Multimap<IJobKey, ITaskConfig> taskConfigsByKey = tasks.index(ITaskConfig::getJob);
       return addAll(Iterables.transform(
           cronTemplates,
-          config -> max(
-              scale(config.getTaskConfig(), config.getInstanceCount()),
-              fromTasks(taskConfigsByKey.get(config.getKey())))));
+          config ->
+              scale(config.getTaskConfig(), config.getInstanceCount())
+                  .max(fromTasks(taskConfigsByKey.get(config.getKey())))));
     }
 
     private static Predicate<IAssignedTask> buildNonUpdatingTasksFilter(
@@ -405,16 +416,13 @@ public interface QuotaManager {
           .setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES));
     }
 
-    private static final Function<ITaskConfig, IResourceAggregate> CONFIG_RESOURCES =
-        config -> IResourceAggregate.build(new ResourceAggregate()
-            .setNumCpus(config.getNumCpus())
-            .setRamMb(config.getRamMb())
-            .setDiskMb(config.getDiskMb()));
+    private static final Function<ITaskConfig, ResourceBag> QUOTA_RESOURCES =
+        config -> bagFromResources(getTaskResources(config, QUOTA_RESOURCE_TYPES));
 
-    private static final Function<IInstanceTaskConfig, IResourceAggregate> INSTANCE_RESOURCES =
+    private static final Function<IInstanceTaskConfig, ResourceBag> INSTANCE_RESOURCES =
         config -> scale(config.getTask(), getUpdateInstanceCount(config.getInstances()));
 
-    private static IResourceAggregate instructionsToResources(
+    private static ResourceBag instructionsToResources(
         Iterable<IInstanceTaskConfig> instructions) {
 
       return addAll(FluentIterable.from(instructions).transform(INSTANCE_RESOURCES));
@@ -433,7 +441,7 @@ public interface QuotaManager {
      *       prod -> non-prod AND {@code prodSharedConsumption=True}: only the initial state
      *       is accounted.
      */
-    private static Function<IJobUpdateInstructions, IResourceAggregate> updateResources(
+    private static Function<IJobUpdateInstructions, ResourceBag> updateResources(
         final Predicate<IInstanceTaskConfig> instanceFilter) {
 
       return instructions -> {
@@ -444,51 +452,26 @@ public interface QuotaManager {
             instanceFilter);
 
         // Calculate result as max(existing, desired) per resource type.
-        return max(
-            instructionsToResources(initialState),
-            instructionsToResources(desiredState));
+        return instructionsToResources(initialState).max(instructionsToResources(desiredState));
       };
     }
 
-    private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) {
-      return addAll(Arrays.asList(a, b));
-    }
-
-    private static IResourceAggregate addAll(Iterable<IResourceAggregate> aggregates) {
-      IResourceAggregate total = EMPTY;
-      for (IResourceAggregate aggregate : aggregates) {
-        total = IResourceAggregate.build(new ResourceAggregate()
-            .setNumCpus(total.getNumCpus() + aggregate.getNumCpus())
-            .setRamMb(total.getRamMb() + aggregate.getRamMb())
-            .setDiskMb(total.getDiskMb() + aggregate.getDiskMb()));
-      }
-      return total;
-    }
-
-    private static IResourceAggregate subtract(IResourceAggregate a, IResourceAggregate b) {
-      return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(a.getNumCpus() - b.getNumCpus())
-          .setRamMb(a.getRamMb() - b.getRamMb())
-          .setDiskMb(a.getDiskMb() - b.getDiskMb()));
-    }
-
-    private static IResourceAggregate max(IResourceAggregate a, IResourceAggregate b) {
-      return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(Math.max(a.getNumCpus(), b.getNumCpus()))
-          .setRamMb(Math.max(a.getRamMb(), b.getRamMb()))
-          .setDiskMb(Math.max(a.getDiskMb(), b.getDiskMb())));
+    private static ResourceBag addAll(Iterable<ResourceBag> aggregates) {
+      return StreamSupport.stream(aggregates.spliterator(), false)
+          .reduce((l, r) -> l.add(r))
+          .orElse(EMPTY);
     }
 
-    private static IResourceAggregate scale(ITaskConfig taskConfig, int instanceCount) {
-      return ResourceAggregates.scale(CONFIG_RESOURCES.apply(taskConfig), instanceCount);
+    private static ResourceBag scale(ITaskConfig taskConfig, int instanceCount) {
+      return QUOTA_RESOURCES.apply(taskConfig).scale(instanceCount);
     }
 
-    private static IResourceAggregate scale(IJobConfiguration jobConfiguration) {
+    private static ResourceBag scale(IJobConfiguration jobConfiguration) {
       return scale(jobConfiguration.getTaskConfig(), jobConfiguration.getInstanceCount());
     }
 
-    private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
-      return addAll(Iterables.transform(tasks, CONFIG_RESOURCES));
+    private static ResourceBag fromTasks(Iterable<ITaskConfig> tasks) {
+      return addAll(Iterables.transform(tasks, QUOTA_RESOURCES));
     }
 
     private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java b/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java
index a735b0b..fce6621 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java
@@ -97,7 +97,7 @@ public final class AcceptedOffer {
     List<Resource.Builder> cpuResources = filterToBuilders(
         reservedFirst,
         ResourceType.CPUS.getMesosName(),
-        revocable ? Resources.REVOCABLE : Resources.NON_REVOCABLE);
+        revocable ? ResourceManager.REVOCABLE : ResourceManager.NON_REVOCABLE);
     List<Resource.Builder> memResources = filterToBuilderNonRevocable(
         reservedFirst, ResourceType.RAM_MB.getMesosName());
     List<Resource.Builder> diskResources = filterToBuilderNonRevocable(
@@ -230,6 +230,6 @@ public final class AcceptedOffer {
       List<Resource> resources,
       String name) {
 
-    return filterToBuilders(resources, name, Resources.NON_REVOCABLE);
+    return filterToBuilders(resources, name, ResourceManager.NON_REVOCABLE);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java b/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java
index f9c89a9..59f5fde 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java
@@ -38,6 +38,22 @@ public interface AuroraResourceConverter<T> {
     return value.toString();
   }
 
+  /**
+   * Gets resource quantity.
+   *
+   * @param value Value to quantify.
+   * @return Resource quantity.
+   */
+  Double quantify(Object value);
+
+  /**
+   * Converts resource quantity to matching resource value type (if such conversion exists).
+   *
+   * @param value Resource quantity.
+   * @return Value of type T.
+   */
+  T valueOf(Double value);
+
   LongConverter LONG = new LongConverter();
   DoubleConverter DOUBLE = new DoubleConverter();
   StringConverter STRING = new StringConverter();
@@ -47,6 +63,16 @@ public interface AuroraResourceConverter<T> {
     public Long parseFrom(String value) {
       return Longs.tryParse(value);
     }
+
+    @Override
+    public Double quantify(Object value) {
+      return (double) (long) value;
+    }
+
+    @Override
+    public Long valueOf(Double value) {
+      return value.longValue();
+    }
   }
 
   class DoubleConverter implements AuroraResourceConverter<Double> {
@@ -54,6 +80,16 @@ public interface AuroraResourceConverter<T> {
     public Double parseFrom(String value) {
       return Double.parseDouble(value);
     }
+
+    @Override
+    public Double quantify(Object value) {
+      return (Double) value;
+    }
+
+    @Override
+    public Double valueOf(Double value) {
+      return value;
+    }
   }
 
   class StringConverter implements AuroraResourceConverter<String> {
@@ -61,5 +97,15 @@ public interface AuroraResourceConverter<T> {
     public String parseFrom(String value) {
       return value;
     }
+
+    @Override
+    public Double quantify(Object value) {
+      return 1.0;
+    }
+
+    @Override
+    public String valueOf(Double value) {
+      throw new UnsupportedOperationException("Unsupported for string resource types");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java
deleted file mode 100644
index 1d19b32..0000000
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.resources;
-
-import com.google.common.collect.Ordering;
-
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-
-/**
- * Convenience class for normalizing resource measures between tasks and offers.
- */
-public final class ResourceAggregates {
-
-  public static final IResourceAggregate EMPTY =
-      IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(0)
-          .setRamMb(0)
-          .setDiskMb(0)
-      );
-
-  public static final IResourceAggregate SMALL =
-      IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(1.0)
-          .setRamMb(1024)
-          .setDiskMb(4096)
-      );
-
-  public static final IResourceAggregate MEDIUM =
-      IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(4.0)
-          .setRamMb(8192)
-          .setDiskMb(16384)
-      );
-
-  public static final IResourceAggregate LARGE =
-      IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(8.0)
-          .setRamMb(16384)
-          .setDiskMb(32768)
-      );
-
-  public static final IResourceAggregate XLARGE =
-      IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(16.0)
-          .setRamMb(32768)
-          .setDiskMb(65536)
-      );
-
-  private ResourceAggregates() {
-    // Utility class.
-  }
-
-  /**
-   * a * m.
-   */
-  public static IResourceAggregate scale(IResourceAggregate a, int m) {
-    return IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(a.getNumCpus() * m)
-        .setRamMb(a.getRamMb() * m)
-        .setDiskMb(a.getDiskMb() * m));
-  }
-
-  /**
-   * a / b.
-   * <p>
-   * This calculates how many times {@code b} "fits into" {@code a}.  Behavior is undefined when
-   * {@code b} contains resources with a value of zero.
-   */
-  public static int divide(IResourceAggregate a, IResourceAggregate b) {
-    return Ordering.natural().min(
-        a.getNumCpus() / b.getNumCpus(),
-        (double) a.getRamMb() / b.getRamMb(),
-        (double) a.getDiskMb() / b.getDiskMb()
-    ).intValue();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java
new file mode 100644
index 0000000..7916ec0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BinaryOperator;
+import java.util.function.Predicate;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+
+import static java.util.stream.Collectors.toMap;
+
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+
+/**
+ * A bag of unique resource values aggregated by {@link ResourceType}.
+ */
+public class ResourceBag {
+  public static final ResourceBag EMPTY = new ResourceBag(ImmutableMap.of(
+      CPUS, 0.0,
+      RAM_MB, 0.0,
+      DISK_MB, 0.0
+  ));
+
+  public static final ResourceBag SMALL = new ResourceBag(ImmutableMap.of(
+      CPUS, 1.0,
+      RAM_MB, 1024.0,
+      DISK_MB, 4096.0
+  ));
+
+  public static final ResourceBag MEDIUM = new ResourceBag(ImmutableMap.of(
+      CPUS, 4.0,
+      RAM_MB, 8192.0,
+      DISK_MB, 16384.0
+  ));
+
+  public static final ResourceBag LARGE = new ResourceBag(ImmutableMap.of(
+      CPUS, 8.0,
+      RAM_MB, 16384.0,
+      DISK_MB, 32768.0
+  ));
+
+  public static final ResourceBag XLARGE = new ResourceBag(ImmutableMap.of(
+      CPUS, 16.0,
+      RAM_MB, 32768.0,
+      DISK_MB, 65536.0
+  ));
+
+  public static final Predicate<Map.Entry<ResourceType, Double>> IS_NEGATIVE =
+      entry -> entry.getValue() < 0;
+
+  public static final Predicate<Map.Entry<ResourceType, Double>> IS_POSITIVE =
+      entry -> entry.getValue() > 0;
+
+  public static final Predicate<Map.Entry<ResourceType, Double>> IS_MESOS_REVOCABLE =
+      entry -> entry.getKey().isMesosRevocable();
+
+  private final Map<ResourceType, Double> resourceVectors;
+
+  /**
+   * Creates an instance of ResourceBag with given resource vectors (type -> value).
+   *
+   * @param resourceVectors Map of resource vectors.
+   */
+  ResourceBag(Map<ResourceType, Double> resourceVectors) {
+    this.resourceVectors = ImmutableMap.copyOf(resourceVectors);
+  }
+
+  /**
+   * Gets resource vectors in the bag.
+   *
+   * @return Map of resource vectors.
+   */
+  public Map<ResourceType, Double> getResourceVectors() {
+    return resourceVectors;
+  }
+
+  /**
+   * Adds this and other bag contents.
+   *
+   * @param other Other bag to add.
+   * @return Result of addition.
+   */
+  public ResourceBag add(ResourceBag other) {
+    return binaryOp(other, (l, r) -> l + r);
+  }
+
+  /**
+   * Subtracts other bag contents from this.
+   *
+   * @param other Other bag to subtract.
+   * @return Result of subtraction.
+   */
+  public ResourceBag subtract(ResourceBag other) {
+    return binaryOp(other, (l, r) -> l - r);
+  }
+
+  /**
+   * Divides this by other bag contents.
+   *
+   * @param other Other bag to divide by.
+   * @return Result of division.
+   */
+  public ResourceBag divide(ResourceBag other) {
+    return binaryOp(other, (l, r) -> l / r);
+  }
+
+  /**
+   * Applies {@code Math.max()} for each matching resource vector.
+   *
+   * @param other Other bag to compare with.
+   * @return A new bag with max resource vectors.
+   */
+  public ResourceBag max(ResourceBag other) {
+    return binaryOp(other, (l, r) -> Math.max(l, r));
+  }
+
+  /**
+   * Scales each resource vector by {@code m}.
+   *
+   * @param m Scale factor.
+   * @return Result of scale operation.
+   */
+  public ResourceBag scale(int m) {
+    return new ResourceBag(resourceVectors.entrySet().stream()
+        .collect(toMap(Map.Entry::getKey, v -> v.getValue() * m)));
+  }
+
+  private ResourceBag binaryOp(ResourceBag other, BinaryOperator<Double> operator) {
+    ImmutableMap.Builder<ResourceType, Double> builder = ImmutableMap.builder();
+    for (Map.Entry<ResourceType, Double> entry : resourceVectors.entrySet()) {
+      // Apply binary operator only on matching keys from the other. If there is no match, keep the
+      // current value unchanged.
+      builder.put(
+          entry.getKey(),
+          other.getResourceVectors().containsKey(entry.getKey())
+              ? operator.apply(entry.getValue(), other.getResourceVectors().get(entry.getKey()))
+              : entry.getValue());
+    }
+
+    return new ResourceBag(builder.build());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof ResourceBag)) {
+      return false;
+    }
+
+    ResourceBag other = (ResourceBag) o;
+    return Objects.equals(resourceVectors, other.resourceVectors);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(resourceVectors);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this).add("resourceVectors", resourceVectors).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index 943e8a4..69087e6 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -14,16 +14,23 @@
 package org.apache.aurora.scheduler.resources;
 
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
+import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 import org.apache.mesos.Protos.Resource;
 
 import static org.apache.aurora.scheduler.resources.ResourceType.fromResource;
@@ -38,6 +45,30 @@ public final class ResourceManager {
   }
 
   /**
+   * TODO(maxim): reduce visibility by redirecting callers to #getRevocableOfferResources().
+   */
+  public static final Predicate<Resource> REVOCABLE =
+      r -> !fromResource(r).isMesosRevocable() || r.hasRevocable();
+
+  /**
+   * TODO(maxim): reduce visibility by redirecting callers to #getNonRevocableOfferResources().
+   */
+  public static final Predicate<Resource> NON_REVOCABLE = r -> !r.hasRevocable();
+
+  private static final Function<IResource, ResourceType> RESOURCE_TO_TYPE = r -> fromResource(r);
+
+  private static final Function<Resource, ResourceType> MESOS_RESOURCE_TO_TYPE =
+      r -> fromResource(r);
+
+  private static final Function<IResource, Double> QUANTIFY_RESOURCE =
+      r -> fromResource(r).getAuroraResourceConverter().quantify(r.getRawValue());
+
+  private static final Function<Resource, Double> QUANTIFY_MESOS_RESOURCE =
+      r -> fromResource(r).getMesosResourceConverter().quantify(r);
+
+  private static final BinaryOperator<Double> REDUCE_VALUES = (l, r) -> l + r;
+
+  /**
    * Gets offer resources matching specified {@link ResourceType}.
    *
    * @param offer Offer to get resources from.
@@ -49,6 +80,26 @@ public final class ResourceManager {
   }
 
   /**
+   * Gets Mesos-revocable offer resources.
+   *
+   * @param offer Offer to get resources from.
+   * @return Mesos-revocable offer resources.
+   */
+  public static Iterable<Resource> getRevocableOfferResources(Offer offer) {
+    return Iterables.filter(offer.getResourcesList(), REVOCABLE);
+  }
+
+  /**
+   * Gets non-Mesos-revocable offer resources.
+   *
+   * @param offer Offer to get resources from.
+   * @return Non-Mesos-revocable offer resources.
+   */
+  public static Iterable<Resource> getNonRevocableOfferResources(Offer offer) {
+    return Iterables.filter(offer.getResourcesList(), NON_REVOCABLE);
+  }
+
+  /**
    * Same as {@link #getTaskResources(ITaskConfig, ResourceType)}.
    *
    * @param task Scheduled task to get resources from.
@@ -71,6 +122,20 @@ public final class ResourceManager {
   }
 
   /**
+   * Gets task resources matching any of the specified resource types.
+   *
+   * @param task Task config to get resources from.
+   * @param typesToMatch EnumSet of resource types.
+   * @return Task resources matching any of the resource types.
+   */
+  public static Iterable<IResource> getTaskResources(
+      ITaskConfig task,
+      EnumSet<ResourceType> typesToMatch) {
+
+    return Iterables.filter(task.getResources(), r -> typesToMatch.contains(fromResource(r)));
+  }
+
+  /**
    * Gets unique task resource types.
    *
    * @param task Task to get resource types from.
@@ -78,7 +143,7 @@ public final class ResourceManager {
    */
   public static Set<ResourceType> getTaskResourceTypes(IAssignedTask task) {
     return EnumSet.copyOf(task.getTask().getResources().stream()
-        .map(r -> fromResource(r))
+        .map(RESOURCE_TO_TYPE)
         .collect(Collectors.toSet()));
   }
 
@@ -87,12 +152,88 @@ public final class ResourceManager {
    *
    * @param resources Mesos resources.
    * @param type Type of resource to quantify.
-   * @return Mesos resource value.
+   * @return Aggregate Mesos resource value.
    */
-  public static Double quantityOf(Iterable<Resource> resources, ResourceType type) {
+  public static Double quantityOfMesosResource(Iterable<Resource> resources, ResourceType type) {
     return StreamSupport.stream(resources.spliterator(), false)
         .filter(r -> fromResource(r).equals(type))
-        .map(r -> fromResource(r).getMesosResourceConverter().quantify(r))
-        .reduce((l, r) -> l + r).orElse(0.0);
+        .map(QUANTIFY_MESOS_RESOURCE)
+        .reduce(REDUCE_VALUES)
+        .orElse(0.0);
+  }
+
+  /**
+   * Gets the quantity of resources. Caller to ensure all resources are of the same type.
+   *
+   * @param resources Resources to sum up.
+   * @return Aggregate resource value.
+   */
+  public static Double quantityOf(Iterable<IResource> resources) {
+    return StreamSupport.stream(resources.spliterator(), false)
+        .map(QUANTIFY_RESOURCE)
+        .reduce(REDUCE_VALUES)
+        .orElse(0.0);
+  }
+
+  /**
+   * Creates a {@link ResourceBag} from resources.
+   *
+   * @param resources Resources to convert.
+   * @return A {@link ResourceBag} instance.
+   */
+  public static ResourceBag bagFromResources(Iterable<IResource> resources) {
+    return bagFromResources(resources, RESOURCE_TO_TYPE, QUANTIFY_RESOURCE);
+  }
+
+  /**
+   * Creates a {@link ResourceBag} from Mesos resources.
+   *
+   * @param resources Mesos resources to convert.
+   * @return A {@link ResourceBag} instance.
+   */
+  public static ResourceBag bagFromMesosResources(Iterable<Resource> resources) {
+    return bagFromResources(resources, MESOS_RESOURCE_TO_TYPE, QUANTIFY_MESOS_RESOURCE);
+  }
+
+  /**
+   * Creates a {@link ResourceBag} from {@link IResourceAggregate}.
+   *
+   * @param aggregate {@link IResourceAggregate} to convert.
+   * @return A {@link ResourceBag} instance.
+   */
+  public static ResourceBag bagFromAggregate(IResourceAggregate aggregate) {
+    return new ResourceBag(aggregate.getResources().stream()
+        .collect(Collectors.toMap(RESOURCE_TO_TYPE, QUANTIFY_RESOURCE)));
+  }
+
+  /**
+   * Creates a {@link IResourceAggregate} from {@link ResourceBag}.
+   *
+   * @param bag {@link ResourceBag} to convert.
+   * @return A {@link IResourceAggregate} instance.
+   */
+  public static IResourceAggregate aggregateFromBag(ResourceBag bag) {
+    return ThriftBackfill.backfillResourceAggregate(new ResourceAggregate()
+        .setResources(bag.getResourceVectors().entrySet().stream()
+            .map(e -> IResource.newBuilder(
+                e.getKey().getValue(),
+                e.getKey().getAuroraResourceConverter().valueOf(e.getValue())))
+            .collect(Collectors.toSet())));
+  }
+
+  private static <T> ResourceBag bagFromResources(
+      Iterable<T> resources,
+      Function<T, ResourceType> typeMapper,
+      Function<T, Double> valueMapper) {
+
+    return new ResourceBag(StreamSupport.stream(resources.spliterator(), false)
+        .collect(Collectors.groupingBy(typeMapper))
+        .entrySet().stream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            group -> group.getValue().stream()
+                .map(valueMapper)
+                .reduce(REDUCE_VALUES)
+                .orElse(0.0))));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
index a8dee95..13922bc 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
@@ -189,28 +189,6 @@ public final class ResourceSlot {
   }
 
   /**
-   * Generates a ResourceSlot where each resource component is a max out of the two components.
-   *
-   * @param a A resource to compare.
-   * @param b A resource to compare.
-   *
-   * @return Returns a ResourceSlot instance where each component is a max of the two components.
-   */
-  @VisibleForTesting
-  static ResourceSlot maxElements(ResourceSlot a, ResourceSlot b) {
-    double maxCPU = Math.max(a.getNumCpus(), b.getNumCpus());
-    Amount<Long, Data> maxRAM = Amount.of(
-        Math.max(a.getRam().as(Data.MB), b.getRam().as(Data.MB)),
-        Data.MB);
-    Amount<Long, Data> maxDisk = Amount.of(
-        Math.max(a.getDisk().as(Data.MB), b.getDisk().as(Data.MB)),
-        Data.MB);
-    int maxPorts = Math.max(a.getNumPorts(), b.getNumPorts());
-
-    return new ResourceSlot(maxCPU, maxRAM, maxDisk, maxPorts);
-  }
-
-  /**
    * Number of CPUs.
    *
    * @return CPUs.

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
index baed3de..276320a 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
@@ -45,7 +45,17 @@ public enum ResourceType implements TEnum {
   /**
    * CPU resource.
    */
-  CPUS(_Fields.NUM_CPUS, SCALAR, "cpus", DOUBLE, Optional.empty(), "CPU", 16, false),
+  CPUS(
+      _Fields.NUM_CPUS,
+      SCALAR,
+      "cpus",
+      DOUBLE,
+      Optional.empty(),
+      "CPU",
+      "core(s)",
+      16,
+      false,
+      true),
 
   /**
    * RAM resource.
@@ -57,7 +67,9 @@ public enum ResourceType implements TEnum {
       LONG,
       Optional.empty(),
       "RAM",
+      "MB",
       Amount.of(24, GB).as(MB),
+      false,
       false),
 
   /**
@@ -70,13 +82,25 @@ public enum ResourceType implements TEnum {
       LONG,
       Optional.empty(),
       "disk",
+      "MB",
       Amount.of(450, GB).as(MB),
+      false,
       false),
 
   /**
    * Port resource.
    */
-  PORTS(_Fields.NAMED_PORT, RANGES, "ports", STRING, Optional.of(PORT_MAPPER), "ports", 1000, true);
+  PORTS(
+      _Fields.NAMED_PORT,
+      RANGES,
+      "ports",
+      STRING,
+      Optional.of(PORT_MAPPER),
+      "ports",
+      "count",
+      1000,
+      true,
+      false);
 
   /**
    * Correspondent thrift {@link org.apache.aurora.gen.Resource} enum value.
@@ -109,6 +133,11 @@ public enum ResourceType implements TEnum {
   private final String auroraName;
 
   /**
+   * Aurora resource unit.
+   */
+  private final String auroraUnit;
+
+  /**
    * Scaling range for comparing scheduling vetoes.
    */
   private final int scalingRange;
@@ -118,6 +147,11 @@ public enum ResourceType implements TEnum {
    */
   private final boolean isMultipleAllowed;
 
+  /**
+   * Indicates if a resource can be Mesos-revocable.
+   */
+  private final boolean isMesosRevocable;
+
   private static ImmutableMap<Integer, ResourceType> byField =
       Maps.uniqueIndex(EnumSet.allOf(ResourceType.class),  ResourceType::getValue);
 
@@ -133,8 +167,10 @@ public enum ResourceType implements TEnum {
    * @param auroraResourceConverter See {@link #getAuroraResourceConverter()} for more details.
    * @param mapper See {@link #getMapper()} for more details.
    * @param auroraName See {@link #getAuroraName()} for more details.
+   * @param auroraUnit See {@link #getAuroraUnit()} for more details.
    * @param scalingRange See {@link #getScalingRange()} for more details.
    * @param isMultipleAllowed See {@link #isMultipleAllowed()} for more details.
+   * @param isMesosRevocable See {@link #isMesosRevocable()} for more details.
    */
   ResourceType(
       _Fields value,
@@ -143,17 +179,21 @@ public enum ResourceType implements TEnum {
       AuroraResourceConverter<?> auroraResourceConverter,
       Optional<ResourceMapper> mapper,
       String auroraName,
+      String auroraUnit,
       int scalingRange,
-      boolean isMultipleAllowed) {
+      boolean isMultipleAllowed,
+      boolean isMesosRevocable) {
 
     this.value = value;
     this.mesosResourceConverter = requireNonNull(mesosResourceConverter);
     this.mesosName = requireNonNull(mesosName);
     this.auroraResourceConverter = requireNonNull(auroraResourceConverter);
-    this.auroraName = requireNonNull(auroraName);
     this.mapper = requireNonNull(mapper);
+    this.auroraName = requireNonNull(auroraName);
+    this.auroraUnit = requireNonNull(auroraUnit);
     this.scalingRange = scalingRange;
     this.isMultipleAllowed = isMultipleAllowed;
+    this.isMesosRevocable = isMesosRevocable;
   }
 
   /**
@@ -215,6 +255,15 @@ public enum ResourceType implements TEnum {
   }
 
   /**
+   * Gets resource unit for internal Aurora representation.
+   *
+   * @return Aurora resource unit.
+   */
+  public String getAuroraUnit() {
+    return auroraUnit;
+  }
+
+  /**
    * Scaling range to use for comparison of scheduling vetoes.
    * <p>
    * This has no real bearing besides trying to determine if a veto along one resource vector
@@ -237,6 +286,18 @@ public enum ResourceType implements TEnum {
   }
 
   /**
+   * Returns a flag indicating if a resource can be Mesos-revocable.
+   * <p>
+   * @see <a href="https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto/">Mesos
+   * protobuf for more details</a>
+   *
+   * @return True if a resource can be Mesos-revocable, false otherwise.
+   */
+  public boolean isMesosRevocable() {
+    return isMesosRevocable;
+  }
+
+  /**
    * Returns a {@link ResourceType} for the given ID.
    *
    * @param value ID value to search by. See {@link #getValue()}.

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
index 94cd163..2ced8dd 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
@@ -17,7 +17,6 @@ import java.util.Set;
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableList;
@@ -32,7 +31,9 @@ import org.apache.mesos.Protos.Value.Range;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.resources.ResourceManager.quantityOf;
+import static org.apache.aurora.scheduler.resources.ResourceManager.NON_REVOCABLE;
+import static org.apache.aurora.scheduler.resources.ResourceManager.REVOCABLE;
+import static org.apache.aurora.scheduler.resources.ResourceManager.quantityOfMesosResource;
 import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
@@ -42,23 +43,6 @@ import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
  * A container for multiple Mesos resource vectors.
  */
 public final class Resources {
-
-  /**
-   * CPU resource filter.
-   */
-  private static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getMesosName());
-
-  /**
-   * Revocable resource filter.
-   */
-  public static final Predicate<Resource> REVOCABLE =
-      Predicates.or(Predicates.not(CPU), Predicates.and(CPU, Resource::hasRevocable));
-
-  /**
-   * Non-revocable resource filter.
-   */
-  public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable);
-
   /**
    * Convert range to set of integers.
    */
@@ -109,10 +93,10 @@ public final class Resources {
    * @return {@code ResourceSlot} instance.
    */
   public ResourceSlot slot() {
-    return new ResourceSlot(quantityOf(mesosResources, CPUS),
-        Amount.of(quantityOf(mesosResources, RAM_MB).longValue(), Data.MB),
-        Amount.of(quantityOf(mesosResources, DISK_MB).longValue(), Data.MB),
-        quantityOf(mesosResources, PORTS).intValue());
+    return new ResourceSlot(quantityOfMesosResource(mesosResources, CPUS),
+        Amount.of(quantityOfMesosResource(mesosResources, RAM_MB).longValue(), Data.MB),
+        Amount.of(quantityOfMesosResource(mesosResources, DISK_MB).longValue(), Data.MB),
+        quantityOfMesosResource(mesosResources, PORTS).intValue());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
index 2c044a6..21121bc 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
@@ -26,14 +26,21 @@ import com.google.common.collect.Range;
 
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.EMPTY;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.LARGE;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.MEDIUM;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.SMALL;
-import static org.apache.aurora.scheduler.resources.ResourceAggregates.XLARGE;
+import static org.apache.aurora.scheduler.resources.ResourceBag.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceBag.LARGE;
+import static org.apache.aurora.scheduler.resources.ResourceBag.MEDIUM;
+import static org.apache.aurora.scheduler.resources.ResourceBag.SMALL;
+import static org.apache.aurora.scheduler.resources.ResourceBag.XLARGE;
+import static org.apache.aurora.scheduler.resources.ResourceManager.getTaskResources;
+import static org.apache.aurora.scheduler.resources.ResourceManager.quantityOf;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
 
 /**
  * Defines a logical grouping criteria to be applied over a set of tasks.
@@ -56,30 +63,30 @@ interface SlaGroup {
     CLUSTER(new Cluster()),
     RESOURCE_CPU(new Resource<>(
         ImmutableMap.of(
-            "sla_cpu_small_", Range.closed(EMPTY.getNumCpus(), SMALL.getNumCpus()),
-            "sla_cpu_medium_", Range.openClosed(SMALL.getNumCpus(), MEDIUM.getNumCpus()),
-            "sla_cpu_large_", Range.openClosed(MEDIUM.getNumCpus(), LARGE.getNumCpus()),
-            "sla_cpu_xlarge_", Range.openClosed(LARGE.getNumCpus(), XLARGE.getNumCpus()),
-            "sla_cpu_xxlarge_", Range.greaterThan(XLARGE.getNumCpus())),
-        task -> task.getAssignedTask().getTask().getNumCpus()
+            "sla_cpu_small_", Range.closed(fromBag(EMPTY, CPUS), fromBag(SMALL, CPUS)),
+            "sla_cpu_medium_", Range.openClosed(fromBag(SMALL, CPUS), fromBag(MEDIUM, CPUS)),
+            "sla_cpu_large_", Range.openClosed(fromBag(MEDIUM, CPUS), fromBag(LARGE, CPUS)),
+            "sla_cpu_xlarge_", Range.openClosed(fromBag(LARGE, CPUS), fromBag(XLARGE, CPUS)),
+            "sla_cpu_xxlarge_", Range.greaterThan(fromBag(XLARGE, CPUS))),
+        task -> quantityOf(getTaskResources(task.getAssignedTask().getTask(), CPUS))
     )),
     RESOURCE_RAM(new Resource<>(
         ImmutableMap.of(
-            "sla_ram_small_", Range.closed(EMPTY.getRamMb(), SMALL.getRamMb()),
-            "sla_ram_medium_", Range.openClosed(SMALL.getRamMb(), MEDIUM.getRamMb()),
-            "sla_ram_large_", Range.openClosed(MEDIUM.getRamMb(), LARGE.getRamMb()),
-            "sla_ram_xlarge_", Range.openClosed(LARGE.getRamMb(), XLARGE.getRamMb()),
-            "sla_ram_xxlarge_", Range.greaterThan(XLARGE.getRamMb())),
-        task -> task.getAssignedTask().getTask().getRamMb()
+            "sla_ram_small_", Range.closed(fromBag(EMPTY, RAM_MB), fromBag(SMALL, RAM_MB)),
+            "sla_ram_medium_", Range.openClosed(fromBag(SMALL, RAM_MB), fromBag(MEDIUM, RAM_MB)),
+            "sla_ram_large_", Range.openClosed(fromBag(MEDIUM, RAM_MB), fromBag(LARGE, RAM_MB)),
+            "sla_ram_xlarge_", Range.openClosed(fromBag(LARGE, RAM_MB), fromBag(XLARGE, RAM_MB)),
+            "sla_ram_xxlarge_", Range.greaterThan(fromBag(XLARGE, RAM_MB))),
+        task -> quantityOf(getTaskResources(task.getAssignedTask().getTask(), RAM_MB))
     )),
     RESOURCE_DISK(new Resource<>(
         ImmutableMap.of(
-            "sla_disk_small_", Range.closed(EMPTY.getDiskMb(), SMALL.getDiskMb()),
-            "sla_disk_medium_", Range.openClosed(SMALL.getDiskMb(), MEDIUM.getDiskMb()),
-            "sla_disk_large_", Range.openClosed(MEDIUM.getDiskMb(), LARGE.getDiskMb()),
-            "sla_disk_xlarge_", Range.openClosed(LARGE.getDiskMb(), XLARGE.getDiskMb()),
-            "sla_disk_xxlarge_", Range.greaterThan(XLARGE.getDiskMb())),
-        task -> task.getAssignedTask().getTask().getDiskMb()
+            "sla_disk_small_", Range.closed(fromBag(EMPTY, DISK_MB), fromBag(SMALL, DISK_MB)),
+            "sla_disk_medium_", Range.openClosed(fromBag(SMALL, DISK_MB), fromBag(MEDIUM, DISK_MB)),
+            "sla_disk_large_", Range.openClosed(fromBag(MEDIUM, DISK_MB), fromBag(LARGE, DISK_MB)),
+            "sla_disk_xlarge_", Range.openClosed(fromBag(LARGE, DISK_MB), fromBag(XLARGE, DISK_MB)),
+            "sla_disk_xxlarge_", Range.greaterThan(fromBag(XLARGE, DISK_MB))),
+        task -> quantityOf(getTaskResources(task.getAssignedTask().getTask(), DISK_MB))
     ));
 
     private SlaGroup group;
@@ -90,6 +97,12 @@ interface SlaGroup {
     SlaGroup getSlaGroup() {
       return group;
     }
+
+    // TODO(maxim): Refactor SLA management to build groups dynamically from
+    // all available ResourceType values.
+    private static Double fromBag(ResourceBag bag, ResourceType type) {
+      return bag.getResourceVectors().get(type);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 03dfa27..1d1415a 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -25,24 +25,22 @@ import com.google.inject.PrivateModule;
 import org.apache.aurora.common.args.Arg;
 import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceSlot;
-import org.apache.aurora.scheduler.resources.Resources;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.resources.ResourceSlot.NONE;
-import static org.apache.aurora.scheduler.resources.Resources.NON_REVOCABLE;
-import static org.apache.aurora.scheduler.resources.Resources.REVOCABLE;
+import static org.apache.aurora.scheduler.resources.ResourceBag.IS_MESOS_REVOCABLE;
+import static org.apache.aurora.scheduler.resources.ResourceBag.IS_POSITIVE;
+import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
+import static org.apache.aurora.scheduler.resources.ResourceManager.getNonRevocableOfferResources;
+import static org.apache.aurora.scheduler.resources.ResourceManager.getRevocableOfferResources;
 
 /**
  * Module to configure export of cluster-wide resource allocation and consumption statistics.
@@ -152,30 +150,26 @@ public class AsyncStatsModule extends AbstractModule {
 
       ImmutableList.Builder<MachineResource> builder = ImmutableList.builder();
       for (HostOffer offer : offers) {
-        ResourceSlot revocable = Resources.from(offer.getOffer()).filter(REVOCABLE).slot();
-        ResourceSlot nonRevocable =
-            Resources.from(offer.getOffer()).filter(NON_REVOCABLE).slot();
+        ResourceBag revocable = bagFromMesosResources(getRevocableOfferResources(offer.getOffer()));
+        ResourceBag nonRevocable =
+            bagFromMesosResources(getNonRevocableOfferResources(offer.getOffer()));
         boolean isDedicated = Conversions.isDedicated(offer.getOffer());
 
-        // It's insufficient to compare revocable against NONE here as RAM, DISK and PORTS
+        // It's insufficient to compare revocable against EMPTY here as RAM, DISK and PORTS
         // are always rolled in to revocable as non-compressible resources. Only if revocable
         // CPU is non-zero should we expose the revocable resources as aggregates.
-        if (revocable.getNumCpus() > 0.0) {
-          builder.add(new MachineResource(fromSlot(revocable), isDedicated, true));
+        if (revocable.getResourceVectors().entrySet().stream()
+            .filter(IS_POSITIVE.and(IS_MESOS_REVOCABLE))
+            .findFirst()
+            .isPresent()) {
+          builder.add(new MachineResource(revocable, isDedicated, true));
         }
 
-        if (!nonRevocable.equals(NONE)) {
-          builder.add(new MachineResource(fromSlot(nonRevocable), isDedicated, false));
+        if (!nonRevocable.equals(ResourceBag.EMPTY)) {
+          builder.add(new MachineResource(nonRevocable, isDedicated, false));
         }
       }
       return builder.build();
     }
-
-    private static IResourceAggregate fromSlot(ResourceSlot slot) {
-      return IResourceAggregate.build(new ResourceAggregate()
-          .setNumCpus(slot.getNumCpus())
-          .setRamMb(slot.getRam().as(Data.MB))
-          .setDiskMb(slot.getDisk().as(Data.MB)));
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
index 1f71b00..a3ca9a9 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.stats;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
@@ -24,22 +25,27 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Ordering;
 
-import org.apache.aurora.scheduler.resources.ResourceAggregates;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.resources.ResourceBag.LARGE;
+import static org.apache.aurora.scheduler.resources.ResourceBag.MEDIUM;
+import static org.apache.aurora.scheduler.resources.ResourceBag.SMALL;
+import static org.apache.aurora.scheduler.resources.ResourceBag.XLARGE;
+
 /**
  * A stat computer that aggregates the number of 'slots' available at different pre-determined
  * slot sizes, broken down by dedicated and non-dedicated hosts.
  */
 class SlotSizeCounter implements Runnable {
-  private static final Map<String, IResourceAggregate> SLOT_SIZES = ImmutableMap.of(
-      "small", ResourceAggregates.SMALL,
-      "medium", ResourceAggregates.MEDIUM,
-      "large", ResourceAggregates.LARGE,
-      "xlarge", ResourceAggregates.XLARGE);
+  private static final Map<String, ResourceBag> SLOT_SIZES = ImmutableMap.of(
+      "small", SMALL,
+      "medium", MEDIUM,
+      "large", LARGE,
+      "xlarge", XLARGE);
 
   // Ensures all counters are always initialized regardless of the Resource availability.
   private static final Iterable<String> SLOT_GROUPS = ImmutableList.of(
@@ -49,13 +55,13 @@ class SlotSizeCounter implements Runnable {
       getPrefix(true, true)
   );
 
-  private final Map<String, IResourceAggregate> slotSizes;
+  private final Map<String, ResourceBag> slotSizes;
   private final MachineResourceProvider machineResourceProvider;
   private final CachedCounters cachedCounters;
 
   @VisibleForTesting
   SlotSizeCounter(
-      final Map<String, IResourceAggregate> slotSizes,
+      final Map<String, ResourceBag> slotSizes,
       MachineResourceProvider machineResourceProvider,
       CachedCounters cachedCounters) {
 
@@ -65,17 +71,17 @@ class SlotSizeCounter implements Runnable {
   }
 
   static class MachineResource {
-    private final IResourceAggregate size;
+    private final ResourceBag size;
     private final boolean dedicated;
     private final boolean revocable;
 
-    MachineResource(IResourceAggregate size, boolean dedicated, boolean revocable) {
+    MachineResource(ResourceBag size, boolean dedicated, boolean revocable) {
       this.size = requireNonNull(size);
       this.dedicated = dedicated;
       this.revocable = revocable;
     }
 
-    public IResourceAggregate getSize() {
+    public ResourceBag getSize() {
       return size;
     }
 
@@ -125,9 +131,12 @@ class SlotSizeCounter implements Runnable {
     return getPrefix(dedicated, revocable) + slotName;
   }
 
-  private int countSlots(Iterable<IResourceAggregate> slots, final IResourceAggregate slotSize) {
-    Function<IResourceAggregate, Integer> counter =
-        machineSlack -> ResourceAggregates.divide(machineSlack, slotSize);
+  private int countSlots(Iterable<ResourceBag> slots, final ResourceBag slotSize) {
+    Function<ResourceBag, Integer> counter = machineSlack -> Ordering.natural().min(
+        machineSlack.divide(slotSize).getResourceVectors().entrySet().stream()
+            .map(entry -> entry.getValue())
+            .collect(Collectors.toSet()))
+        .intValue();
 
     int sum = 0;
     for (int slotCount : FluentIterable.from(slots).transform(counter)) {
@@ -139,14 +148,14 @@ class SlotSizeCounter implements Runnable {
   private void updateStats(
       String name,
       Iterable<MachineResource> slots,
-      IResourceAggregate slotSize) {
+      ResourceBag slotSize) {
 
-    ImmutableMultimap.Builder<String, IResourceAggregate> builder = ImmutableMultimap.builder();
+    ImmutableMultimap.Builder<String, ResourceBag> builder = ImmutableMultimap.builder();
     for (MachineResource slot : slots) {
       builder.put(getStatName(name, slot.isDedicated(), slot.isRevocable()), slot.getSize());
     }
 
-    ImmutableMultimap<String, IResourceAggregate> sizes = builder.build();
+    ImmutableMultimap<String, ResourceBag> sizes = builder.build();
 
     for (String slotGroup : SLOT_GROUPS) {
       String statName = slotGroup + name;

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
index d1c62a8..0a307fe 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java
@@ -25,6 +25,7 @@ import org.apache.aurora.gen.Resource;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
@@ -119,7 +120,7 @@ public final class ThriftBackfill {
       aggregate.addToResources(Resource.ramMb(aggregate.getRamMb()));
       aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb()));
     } else {
-      EnumSet<ResourceType> quotaResources = EnumSet.of(CPUS, RAM_MB, DISK_MB);
+      EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES;
       if (aggregate.getResources().size() > quotaResources.size()) {
         throw new IllegalArgumentException("Too many resource values in quota.");
       }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
index bab34d8..0d4f044 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
@@ -103,6 +103,7 @@ import static java.util.Objects.requireNonNull;
 import static org.apache.aurora.gen.ResponseCode.INVALID_REQUEST;
 import static org.apache.aurora.scheduler.base.Numbers.convertRanges;
 import static org.apache.aurora.scheduler.base.Numbers.toRanges;
+import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag;
 import static org.apache.aurora.scheduler.thrift.Responses.error;
 import static org.apache.aurora.scheduler.thrift.Responses.invalidRequest;
 import static org.apache.aurora.scheduler.thrift.Responses.ok;
@@ -281,12 +282,16 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
     MorePreconditions.checkNotBlank(ownerRole);
     return storage.read(storeProvider -> {
       QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ownerRole, storeProvider);
-      GetQuotaResult result = new GetQuotaResult(quotaInfo.getQuota().newBuilder())
-          .setProdSharedConsumption(quotaInfo.getProdSharedConsumption().newBuilder())
-          .setProdDedicatedConsumption(quotaInfo.getProdDedicatedConsumption().newBuilder())
-          .setNonProdSharedConsumption(quotaInfo.getNonProdSharedConsumption().newBuilder())
-          .setNonProdDedicatedConsumption(
-              quotaInfo.getNonProdDedicatedConsumption().newBuilder());
+      GetQuotaResult result = new GetQuotaResult()
+          .setQuota(aggregateFromBag(quotaInfo.getQuota()).newBuilder())
+          .setProdSharedConsumption(aggregateFromBag(
+              quotaInfo.getProdSharedConsumption()).newBuilder())
+          .setProdDedicatedConsumption(aggregateFromBag(
+              quotaInfo.getProdDedicatedConsumption()).newBuilder())
+          .setNonProdSharedConsumption(aggregateFromBag(
+              quotaInfo.getNonProdSharedConsumption()).newBuilder())
+          .setNonProdDedicatedConsumption(aggregateFromBag(
+              quotaInfo.getNonProdDedicatedConsumption()).newBuilder());
 
       return ok(Result.getQuotaResult(result));
     });

http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
index d989900..b6aee57 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
@@ -13,12 +13,15 @@
  */
 package org.apache.aurora.scheduler.quota;
 
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.greaterOrEqual;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -26,61 +29,31 @@ public class QuotaCheckResultTest {
 
   @Test
   public void testGreaterOrEqualPass() {
-    IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(256L)
-        .setDiskMb(512L));
-    IResourceAggregate request = IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(256L)
-        .setDiskMb(512L));
-    assertEquals(SUFFICIENT_QUOTA, QuotaCheckResult.greaterOrEqual(quota, request).getResult());
+    assertEquals(
+        SUFFICIENT_QUOTA,
+        greaterOrEqual(bag(1.0, 256, 512), bag(1.0, 256, 512)).getResult());
   }
 
   @Test
   public void testGreaterOrEqualFailsCpu() {
-    IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(256L)
-        .setDiskMb(512L));
-    IResourceAggregate request = IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(2.0)
-        .setRamMb(256L)
-        .setDiskMb(512L));
-    QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+    QuotaCheckResult result = greaterOrEqual(bag(1.0, 256, 512), bag(2.0, 256, 512));
     assertEquals(INSUFFICIENT_QUOTA, result.getResult());
-    assertTrue(result.getDetails().get().contains("CPU"));
+    assertTrue(result.getDetails().get().contains(CPUS.getAuroraName()));
   }
 
   @Test
   public void testGreaterOrEqualFailsRam() {
-    IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(256L)
-        .setDiskMb(512L));
-    IResourceAggregate request = IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(512L)
-        .setDiskMb(512L));
-    QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+    QuotaCheckResult result = greaterOrEqual(bag(1.0, 256, 512), bag(1.0, 512, 512));
     assertEquals(INSUFFICIENT_QUOTA, result.getResult());
     assertTrue(result.getDetails().get().length() > 0);
-    assertTrue(result.getDetails().get().contains("RAM"));
+    assertTrue(result.getDetails().get().contains(RAM_MB.getAuroraName()));
   }
 
   @Test
   public void testGreaterOrEqualFailsDisk() {
-    IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(256L)
-        .setDiskMb(512L));
-    IResourceAggregate request = IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(256L)
-        .setDiskMb(1024L));
-    QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+    QuotaCheckResult result = greaterOrEqual(bag(1.0, 256, 512), bag(1.0, 256, 1024));
     assertEquals(INSUFFICIENT_QUOTA, result.getResult());
     assertTrue(result.getDetails().get().length() > 0);
-    assertTrue(result.getDetails().get().contains("DISK"));
+    assertTrue(result.getDetails().get().contains(DISK_MB.getAuroraName()));
   }
 }