You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/04/14 18:54:09 UTC

aurora git commit: Simplify AttributeAggregate.

Repository: aurora
Updated Branches:
  refs/heads/master 9aab87f18 -> 3b29a4b79


Simplify AttributeAggregate.

Reviewed at https://reviews.apache.org/r/33106/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3b29a4b7
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3b29a4b7
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3b29a4b7

Branch: refs/heads/master
Commit: 3b29a4b797e137f16b4fda76cd42073a0e5b3ad5
Parents: 9aab87f
Author: Bill Farner <wf...@apache.org>
Authored: Tue Apr 14 09:53:22 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Apr 14 09:53:22 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  |  18 +--
 .../scheduler/filter/AttributeAggregate.java    | 117 ++++++++++---------
 .../scheduler/async/TaskSchedulerImplTest.java  |  16 +--
 .../scheduler/async/TaskSchedulerTest.java      |  31 +++--
 .../preemptor/PendingTaskProcessorTest.java     |   8 +-
 .../async/preemptor/PreemptorImplTest.java      |  20 +---
 .../async/preemptor/PreemptorModuleTest.java    |  10 +-
 .../preemptor/PreemptorSlotFinderTest.java      |  13 +--
 .../events/NotifyingSchedulingFilterTest.java   |  18 +--
 .../filter/AttributeAggregateTest.java          |  92 +++++++--------
 .../filter/SchedulingFilterImplTest.java        | 108 +++++++----------
 .../scheduler/state/TaskAssignerImplTest.java   |  16 +--
 12 files changed, 187 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index ce87344..0113505 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Singleton;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.EventBus;
@@ -50,7 +48,6 @@ import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.Reserva
 import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.async.preemptor.PreemptorModule;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -309,7 +306,6 @@ public class SchedulingBenchmarks {
    * Tests preemptor searching for a preemption slot in a completely filled up cluster.
    */
   public static class PreemptorSlotSearchBenchmark extends AbstractBase {
-
     @Override
     protected BenchmarkSettings getSettings() {
       return new BenchmarkSettings.Builder()
@@ -327,20 +323,8 @@ public class SchedulingBenchmarks {
         @Override
         public Boolean apply(final Storage.MutableStoreProvider storeProvider) {
           IAssignedTask assignedTask = getSettings().getTask().getAssignedTask();
-          final Query.Builder query = Query.jobScoped(assignedTask.getTask().getJob())
-              .byStatus(org.apache.aurora.scheduler.base.Tasks.SLAVE_ASSIGNED_STATES);
-
-          Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize(
-              new Supplier<ImmutableSet<IScheduledTask>>() {
-                @Override
-                public ImmutableSet<IScheduledTask> get() {
-                  return storeProvider.getTaskStore().fetchTasks(query);
-                }
-              });
-
           AttributeAggregate aggregate =
-              new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore());
-
+              AttributeAggregate.getJobActiveState(storeProvider, assignedTask.getTask().getJob());
           Optional<String> result =
               preemptor.attemptPreemptionFor(assignedTask, aggregate, storeProvider);
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index ed82ae9..bd74f89 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -13,17 +13,13 @@
  */
 package org.apache.aurora.scheduler.filter;
 
-import java.util.Map;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.AtomicLongMap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Multiset;
 import com.twitter.common.collections.Pair;
 
 import org.apache.aurora.scheduler.base.Query;
@@ -41,18 +37,20 @@ import static java.util.Objects.requireNonNull;
  * once the job state may change (e.g. after exiting a write transaction). This is intended to
  * capture job state once and avoid redundant queries.
  * <p>
- * Note that while the state injected into this class is used lazily (to allow for queries to happen
- * only on-demand), calling {@link #equals(Object)} and {@link #hashCode()} rely on the aggregation
- * result, thus invoking the {@link Supplier} and {@link AttributeStore}.
+ * TODO(wfarner): Consider preserving this as only a helper class to compute the Multiset
+ * representing the aggregate, since this class is now a thin wrapper over a Multiset.
  */
-public class AttributeAggregate {
+public final class AttributeAggregate {
 
   /**
-   * A lazily-computed mapping from attribute name and value to the count of tasks with that
-   * name/value combination.  See doc for {@link #getNumTasksWithAttribute(String, String)} for
-   * further details.
+   * A mapping from attribute name and value to the count of tasks with that name/value combination.
+   * See doc for {@link #getNumTasksWithAttribute(String, String)} for further details.
    */
-  private final Supplier<Map<Pair<String, String>, Long>> aggregate;
+  private final Supplier<Multiset<Pair<String, String>>> aggregate;
+
+  private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) {
+    this.aggregate = Suppliers.memoize(aggregate);
+  }
 
   /**
    * Initializes an {@link AttributeAggregate} instance from data store.
@@ -65,58 +63,72 @@ public class AttributeAggregate {
       final StoreProvider storeProvider,
       final IJobKey jobKey) {
 
-    Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize(
-        new Supplier<ImmutableSet<IScheduledTask>>() {
+    return create(
+        new Supplier<Iterable<IScheduledTask>>() {
           @Override
-          public ImmutableSet<IScheduledTask> get() {
-            return storeProvider.getTaskStore().fetchTasks(
-                Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES));
+          public Iterable<IScheduledTask> get() {
+            return storeProvider.getTaskStore()
+                .fetchTasks(Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES));
           }
-        });
-    return new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore());
+        },
+        storeProvider.getAttributeStore());
   }
 
-  /**
-   * Creates a new attribute aggregate, which will be computed from the provided external state.
-   *
-   * @param activeTaskSupplier Supplier of active tasks within the aggregated scope.
-   * @param attributeStore Source of host attributes to associate with tasks.
-   */
-  public AttributeAggregate(
-      final Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier,
+  @VisibleForTesting
+  static AttributeAggregate create(
+      final Supplier<Iterable<IScheduledTask>> taskSupplier,
       final AttributeStore attributeStore) {
 
-    requireNonNull(activeTaskSupplier);
-    requireNonNull(attributeStore);
-
-    final Function<IScheduledTask, Iterable<IAttribute>> getHostAttributes =
-        new Function<IScheduledTask, Iterable<IAttribute>>() {
+    final Function<String, Iterable<IAttribute>> getHostAttributes =
+        new Function<String, Iterable<IAttribute>>() {
           @Override
-          public Iterable<IAttribute> apply(IScheduledTask task) {
+          public Iterable<IAttribute> apply(String host) {
             // Note: this assumes we have access to attributes for hosts where all active tasks
             // reside.
-            String host = requireNonNull(task.getAssignedTask().getSlaveHost());
+            requireNonNull(host);
             return attributeStore.getHostAttributes(host).get().getAttributes();
           }
         };
 
-    aggregate = Suppliers.memoize(new Supplier<Map<Pair<String, String>, Long>>() {
-      @Override
-      public Map<Pair<String, String>, Long> get() {
-        AtomicLongMap<Pair<String, String>> counts = AtomicLongMap.create();
-        Iterable<IAttribute> allAttributes =
-            Iterables.concat(Iterables.transform(activeTaskSupplier.get(), getHostAttributes));
-        for (IAttribute attribute : allAttributes) {
-          for (String value : attribute.getValues()) {
-            counts.incrementAndGet(Pair.of(attribute.getName(), value));
+    return create(Suppliers.compose(
+        new Function<Iterable<IScheduledTask>, Iterable<IAttribute>>() {
+          @Override
+          public Iterable<IAttribute> apply(Iterable<IScheduledTask> tasks) {
+            return FluentIterable.from(tasks)
+                .transform(Tasks.SCHEDULED_TO_SLAVE_HOST)
+                .transformAndConcat(getHostAttributes);
+          }
+        },
+        taskSupplier));
+  }
+
+  @VisibleForTesting
+  static AttributeAggregate create(Supplier<Iterable<IAttribute>> attributes) {
+    Supplier<Multiset<Pair<String, String>>> aggregator = Suppliers.compose(
+        new Function<Iterable<IAttribute>, Multiset<Pair<String, String>>>() {
+          @Override
+          public Multiset<Pair<String, String>> apply(Iterable<IAttribute> attributes) {
+            ImmutableMultiset.Builder<Pair<String, String>> builder = ImmutableMultiset.builder();
+            for (IAttribute attribute : attributes) {
+              for (String value : attribute.getValues()) {
+                builder.add(Pair.of(attribute.getName(), value));
+              }
+            }
+
+            return builder.build();
           }
-        }
+        },
+        attributes
+    );
 
-        return ImmutableMap.copyOf(counts.asMap());
-      }
-    });
+    return new AttributeAggregate(aggregator);
   }
 
+  @VisibleForTesting
+  public static final AttributeAggregate EMPTY =
+      new AttributeAggregate(Suppliers.<Multiset<Pair<String, String>>>ofInstance(
+          ImmutableMultiset.<Pair<String, String>>of()));
+
   /**
    * Gets the total number of tasks with a given attribute name and value combination.
    * <p>
@@ -135,12 +147,11 @@ public class AttributeAggregate {
    * @return Number of tasks in the job whose hosts have the provided attribute name and value.
    */
   public long getNumTasksWithAttribute(String name, String value) {
-    return Optional.fromNullable(aggregate.get().get(Pair.of(name, value)))
-        .or(0L);
+    return aggregate.get().count(Pair.of(name, value));
   }
 
   @VisibleForTesting
-  Map<Pair<String, String>, Long> getAggregates() {
+  Multiset<Pair<String, String>> getAggregates() {
     return aggregate.get();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index c5643d9..b61abf9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -43,14 +42,12 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -66,6 +63,7 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -96,7 +94,6 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   private Amount<Long, Time> reservationDuration;
   private Amount<Long, Time> halfReservationDuration;
   private EventSink eventSink;
-  private AttributeAggregate emptyJob;
 
   @Before
   public void setUp() throws Exception {
@@ -113,9 +110,6 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     Injector injector = getInjector(storageUtil.storage);
     scheduler = injector.getInstance(TaskScheduler.class);
     eventSink = PubsubTestUtil.startPubsub(injector);
-    emptyJob = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        createMock(AttributeStore.class));
   }
 
   private Injector getInjector(final Storage storageImpl) {
@@ -146,7 +140,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expect(assigner.maybeAssign(
         storageUtil.mutableStoreProvider,
         OFFER,
-        new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), emptyJob)))
+        new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), EMPTY)))
         .andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
   }
 
@@ -207,7 +201,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expect(assigner.maybeAssign(
         storageUtil.mutableStoreProvider,
         OFFER,
-        new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), emptyJob)))
+        new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), EMPTY)))
         .andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
 
     control.replay();
@@ -317,7 +311,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expect(assigner.maybeAssign(
         EasyMock.<MutableStoreProvider>anyObject(),
         eq(OFFER),
-        eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), emptyJob))))
+        eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), EMPTY))))
         .andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
 
     control.replay();
@@ -346,7 +340,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
     expect(preemptor.attemptPreemptionFor(
         task.getAssignedTask(),
-        emptyJob,
+        EMPTY,
         storageUtil.mutableStoreProvider)).andReturn(result);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 88c0163..9c47a76 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.RateLimiter;
@@ -59,7 +58,6 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -89,6 +87,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
 import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
@@ -125,7 +124,6 @@ public class TaskSchedulerTest extends EasyMockTest {
   private StatsProvider statsProvider;
   private RescheduleCalculator rescheduleCalculator;
   private Preemptor preemptor;
-  private AttributeAggregate emptyJob;
   private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES);
 
   @Before
@@ -144,9 +142,6 @@ public class TaskSchedulerTest extends EasyMockTest {
     statsProvider = createMock(StatsProvider.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
     preemptor = createMock(Preemptor.class);
-    emptyJob = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        createMock(AttributeStore.class));
   }
 
   private void replayAndCreateScheduler() {
@@ -315,11 +310,11 @@ public class TaskSchedulerTest extends EasyMockTest {
     TaskInfo mesosTask = makeTaskInfo(task);
 
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
     expectPreemptorCall(task);
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
 
     Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -350,7 +345,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
     expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
     expect(stateManager.changeState(
@@ -380,10 +375,10 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andThrow(new StorageException("Injected failure."));
+    expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure."));
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
     expectLastCall();
 
@@ -402,7 +397,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
     expectAnyMaintenanceCalls();
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
     expectPreemptorCall(task);
     driver.declineOffer(OFFER_A.getOffer().getId());
@@ -464,13 +459,13 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expectMaybeAssign(OFFER_A, taskA, emptyJob).andReturn(Assignment.success(mesosTaskA));
+    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expectMaybeAssign(OFFER_B, taskB, emptyJob).andReturn(Assignment.success(mesosTaskB));
+    expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
     driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
@@ -496,7 +491,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expectMaybeAssign(OFFER_B, taskA, emptyJob).andReturn(Assignment.success(mesosTaskA));
+    expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
     driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
@@ -505,7 +500,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     HostOffer updatedOfferC = new HostOffer(
         OFFER_C.getOffer(),
         IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
-    expectMaybeAssign(updatedOfferC, taskB, emptyJob).andReturn(Assignment.success(mesosTaskB));
+    expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
     driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
@@ -602,7 +597,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     final IScheduledTask task = makeTask("a", PENDING);
 
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
     expectPreemptorCall(task);
 
@@ -655,7 +650,7 @@ public class TaskSchedulerTest extends EasyMockTest {
   private void expectPreemptorCall(IScheduledTask task) {
     expect(preemptor.attemptPreemptionFor(
         eq(task.getAssignedTask()),
-        eq(emptyJob),
+        eq(EMPTY),
         EasyMock.<MutableStoreProvider>anyObject())).andReturn(Optional.<String>absent());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
index bcd1b4e..75fc16d 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.async.preemptor;
 import java.util.Arrays;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
@@ -33,7 +32,6 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -68,7 +66,6 @@ public class PendingTaskProcessorTest extends EasyMockTest {
   private FakeStatsProvider statsProvider;
   private PreemptionSlotFinder preemptionSlotFinder;
   private PendingTaskProcessor slotFinder;
-  private AttributeAggregate attrAggregate;
   private PreemptionSlotCache slotCache;
   private FakeClock clock;
 
@@ -80,9 +77,6 @@ public class PendingTaskProcessorTest extends EasyMockTest {
     slotCache = createMock(PreemptionSlotCache.class);
     statsProvider = new FakeStatsProvider();
     clock = new FakeClock();
-    attrAggregate = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        createMock(AttributeStore.class));
 
     slotFinder = new PendingTaskProcessor(
         storageUtil.storage,
@@ -135,7 +129,7 @@ public class PendingTaskProcessorTest extends EasyMockTest {
   private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> slot) {
     expect(preemptionSlotFinder.findPreemptionSlotFor(
         IAssignedTask.build(task.getAssignedTask()),
-        attrAggregate,
+        AttributeAggregate.EMPTY,
         storageUtil.storeProvider)).andReturn(slot);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
index 281f4e0..97d6087 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -14,9 +14,7 @@
 package org.apache.aurora.scheduler.async.preemptor;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
-
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
@@ -28,10 +26,8 @@ import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -43,6 +39,7 @@ import org.junit.Test;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName;
 import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -61,7 +58,6 @@ public class PreemptorImplTest extends EasyMockTest {
   private FakeStatsProvider statsProvider;
   private PreemptionSlotFinder preemptionSlotFinder;
   private PreemptorImpl preemptor;
-  private AttributeAggregate attrAggregate;
   private PreemptionSlotCache slotCache;
   private Storage.MutableStoreProvider storeProvider;
 
@@ -72,10 +68,6 @@ public class PreemptorImplTest extends EasyMockTest {
     preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
     slotCache = createMock(PreemptionSlotCache.class);
     statsProvider = new FakeStatsProvider();
-    attrAggregate = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        createMock(AttributeStore.class));
-
     preemptor = new PreemptorImpl(
         stateManager,
         preemptionSlotFinder,
@@ -124,15 +116,15 @@ public class PreemptorImplTest extends EasyMockTest {
   }
 
   private Optional<String> callPreemptor() {
-    return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), attrAggregate, storeProvider);
+    return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider);
   }
 
   private void expectSlotValidation(Optional<ImmutableSet<PreemptionVictim>> victims) {
     expect(preemptionSlotFinder.validatePreemptionSlotFor(
-        eq(TASK.getAssignedTask()),
-        eq(attrAggregate),
-        eq(SLOT),
-        anyObject(Storage.MutableStoreProvider.class))).andReturn(victims);
+        TASK.getAssignedTask(),
+        EMPTY,
+        SLOT,
+        storeProvider)).andReturn(victims);
   }
 
   private void expectPreempted(IScheduledTask preempted) throws Exception {

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
index 7e2d1c5..9d3820a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
@@ -14,8 +14,6 @@
 package org.apache.aurora.scheduler.async.preemptor;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -33,10 +31,8 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -80,10 +76,6 @@ public class PreemptorModuleTest extends EasyMockTest {
         Amount.of(0L, Time.SECONDS),
         Amount.of(0L, Time.SECONDS)));
 
-    Supplier<ImmutableSet<IScheduledTask>> taskSupplier =
-        createMock(new EasyMockTest.Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
-    AttributeStore attributeStore = createMock(AttributeStore.class);
-
     control.replay();
 
     injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
@@ -93,7 +85,7 @@ public class PreemptorModuleTest extends EasyMockTest {
         Optional.<String>absent(),
         injector.getInstance(Preemptor.class).attemptPreemptionFor(
             IAssignedTask.build(new AssignedTask()),
-            new AttributeAggregate(taskSupplier, attributeStore),
+            AttributeAggregate.EMPTY,
             storageUtil.mutableStoreProvider));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
index b80e558..eed2de9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
@@ -20,7 +20,6 @@ import java.util.Set;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
@@ -48,16 +47,13 @@ import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
 import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlotFinderImpl;
 import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.mesos.TaskExecutors;
 import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.mesos.Protos;
@@ -70,6 +66,7 @@ import org.junit.Test;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
 import static org.apache.mesos.Protos.Offer;
 import static org.apache.mesos.Protos.Resource;
 import static org.easymock.EasyMock.expect;
@@ -98,7 +95,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
   private FakeStatsProvider statsProvider;
   private ClusterState clusterState;
   private OfferManager offerManager;
-  private AttributeAggregate emptyJob;
   private PreemptorMetrics preemptorMetrics;
 
   @Before
@@ -109,9 +105,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
     storageUtil.expectOperations();
     statsProvider = new FakeStatsProvider();
     preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
-    emptyJob = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        createMock(AttributeStore.class));
   }
 
   private Optional<PreemptionSlot> runSlotFinder(ScheduledTask pendingTask) {
@@ -124,7 +117,7 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     return slotFinder.findPreemptionSlotFor(
         IAssignedTask.build(pendingTask.getAssignedTask()),
-        emptyJob,
+        EMPTY,
         storageUtil.mutableStoreProvider);
   }
 
@@ -417,7 +410,7 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     Optional<ImmutableSet<PreemptionVictim>> victims = slotFinder.validatePreemptionSlotFor(
         IAssignedTask.build(p1.getAssignedTask()),
-        emptyJob,
+        EMPTY,
         slot.get(),
         storageUtil.mutableStoreProvider);
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 61cea32..2b71043 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.events;
 
 import java.util.Set;
 
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
@@ -31,9 +30,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.TaskExecutors;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,7 +48,8 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private static final UnusedResource RESOURCE = new UnusedResource(
       ResourceSlot.from(TASK, TaskExecutors.NO_OVERHEAD_EXECUTOR),
       IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
-  private ResourceRequest request;
+  private static final ResourceRequest REQUEST =
+      new ResourceRequest(TASK, "taskId", AttributeAggregate.EMPTY);
 
   private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
   private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);
@@ -65,30 +63,26 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
     delegate = createMock(SchedulingFilter.class);
     eventSink = createMock(EventSink.class);
     filter = new NotifyingSchedulingFilter(delegate, eventSink);
-    AttributeAggregate emptyJob = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        createMock(AttributeStore.class));
-    request = new ResourceRequest(TASK, "taskId", emptyJob);
   }
 
   @Test
   public void testEvents() {
     Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
-    expect(delegate.filter(RESOURCE, request)).andReturn(vetoes);
+    expect(delegate.filter(RESOURCE, REQUEST)).andReturn(vetoes);
     eventSink.post(new Vetoed(GROUP_KEY, vetoes));
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(RESOURCE, request));
+    assertEquals(vetoes, filter.filter(RESOURCE, REQUEST));
   }
 
   @Test
   public void testNoVetoes() {
     Set<Veto> vetoes = ImmutableSet.of();
-    expect(delegate.filter(RESOURCE, request)).andReturn(vetoes);
+    expect(delegate.filter(RESOURCE, REQUEST)).andReturn(vetoes);
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(RESOURCE, request));
+    assertEquals(vetoes, filter.filter(RESOURCE, REQUEST));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
index 4b56576..6b36062 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
@@ -13,12 +13,11 @@
  */
 package org.apache.aurora.scheduler.filter;
 
-import java.util.Map;
-
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMultiset;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multiset;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
@@ -37,65 +36,51 @@ import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class AttributeAggregateTest extends EasyMockTest {
-
-  private Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier;
   private AttributeStore attributeStore;
-  private AttributeAggregate aggregate;
 
   @Before
   public void setUp() throws Exception {
-    activeTaskSupplier = createMock(new Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
     attributeStore = createMock(AttributeStore.class);
-    aggregate = new AttributeAggregate(activeTaskSupplier, attributeStore);
   }
 
   @Test
   public void testNoTasks() {
-    expectGetTasks();
-
     control.replay();
 
-    assertAggregates(ImmutableMap.<Pair<String, String>, Long>of());
-    assertAggregate("none", "alsoNone", 0);
+    AttributeAggregate aggregate = aggregate();
+    assertEquals(ImmutableMultiset.<Pair<String, String>>of(), aggregate.getAggregates());
+    assertAggregate(aggregate, "none", "alsoNone", 0);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testAttributesMissing() {
-    expectGetTasks(task("1", "a"));
     expect(attributeStore.getHostAttributes("a")).andReturn(Optional.<IHostAttributes>absent());
 
     control.replay();
 
-    aggregate.getAggregates();
+    aggregate(task("1", "a")).getAggregates();
   }
 
   @Test(expected = NullPointerException.class)
   public void testTaskWithNoHost() {
-    expectGetTasks(task("1", null));
-
     control.replay();
 
-    aggregate.getAggregates();
+    aggregate(task("1", null)).getAggregates();
   }
 
   @Test
   public void testNoAttributes() {
-    expectGetTasks(task("1", "hostA"));
     expectGetAttributes("hostA");
 
     control.replay();
 
-    assertAggregates(ImmutableMap.<Pair<String, String>, Long>of());
+    assertEquals(
+        ImmutableMultiset.<Pair<String, String>>of(),
+        aggregate(task("1", "hostA")).getAggregates());
   }
 
   @Test
   public void testAggregate() {
-    expectGetTasks(
-        task("1", "a1"),
-        task("2", "b1"),
-        task("3", "b1"),
-        task("4", "b2"),
-        task("5", "c1"));
     expectGetAttributes(
         "a1",
         attribute("host", "a1"),
@@ -121,29 +106,37 @@ public class AttributeAggregateTest extends EasyMockTest {
 
     control.replay();
 
-    Map<Pair<String, String>, Long> expected = ImmutableMap.<Pair<String, String>, Long>builder()
-        .put(Pair.of("rack", "a"), 1L)
-        .put(Pair.of("rack", "b"), 3L)
-        .put(Pair.of("rack", "c"), 1L)
-        .put(Pair.of("host", "a1"), 1L)
-        .put(Pair.of("host", "b1"), 2L)
-        .put(Pair.of("host", "b2"), 1L)
-        .put(Pair.of("host", "c1"), 1L)
-        .put(Pair.of("pdu", "p1"), 4L)
-        .put(Pair.of("pdu", "p2"), 4L)
-        .put(Pair.of("ssd", "true"), 1L)
+    Multiset<Pair<String, String>> expected = ImmutableMultiset.<Pair<String, String>>builder()
+        .add(Pair.of("rack", "a"))
+        .addCopies(Pair.of("rack", "b"), 3)
+        .add(Pair.of("rack", "c"))
+        .add(Pair.of("host", "a1"))
+        .addCopies(Pair.of("host", "b1"), 2)
+        .add(Pair.of("host", "b2"))
+        .add(Pair.of("host", "c1"))
+        .addCopies(Pair.of("pdu", "p1"), 4)
+        .addCopies(Pair.of("pdu", "p2"), 4)
+        .add(Pair.of("ssd", "true"))
         .build();
-    assertAggregates(expected);
-    for (Map.Entry<Pair<String, String>, Long> entry : expected.entrySet()) {
-      assertAggregate(entry.getKey().getFirst(), entry.getKey().getSecond(), entry.getValue());
+    AttributeAggregate aggregate = aggregate(
+        task("1", "a1"),
+        task("2", "b1"),
+        task("3", "b1"),
+        task("4", "b2"),
+        task("5", "c1"));
+    assertEquals(expected, aggregate.getAggregates());
+    for (Multiset.Entry<Pair<String, String>> entry : expected.entrySet()) {
+      Pair<String, String> element = entry.getElement();
+      assertAggregate(aggregate, element.getFirst(), element.getSecond(), entry.getCount());
     }
-    assertAggregate("host", "c2", 0L);
-    assertAggregate("hostc", "2", 0L);
+    assertAggregate(aggregate, "host", "c2", 0L);
+    assertAggregate(aggregate, "hostc", "2", 0L);
   }
 
-  private void expectGetTasks(IScheduledTask... activeTasks) {
-    expect(activeTaskSupplier.get())
-        .andReturn(ImmutableSet.<IScheduledTask>builder().add(activeTasks).build());
+  private AttributeAggregate aggregate(IScheduledTask... activeTasks) {
+    return AttributeAggregate.create(
+        Suppliers.<Iterable<IScheduledTask>>ofInstance(ImmutableSet.copyOf(activeTasks)),
+        attributeStore);
   }
 
   private IExpectationSetters<?> expectGetAttributes(String host, Attribute... attributes) {
@@ -153,11 +146,12 @@ public class AttributeAggregateTest extends EasyMockTest {
             .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build()))));
   }
 
-  private void assertAggregates(Map<Pair<String, String>, Long> expected) {
-    assertEquals(expected, aggregate.getAggregates());
-  }
+  private void assertAggregate(
+      AttributeAggregate aggregate,
+      String name,
+      String value,
+      long expected) {
 
-  private void assertAggregate(String name, String value, long expected) {
     assertEquals(expected, aggregate.getNumTasksWithAttribute(name, value));
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index d06b89c..26bad99 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -15,16 +15,14 @@ package org.apache.aurora.scheduler.filter;
 
 import java.util.Arrays;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
-import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.ExecutorConfig;
@@ -32,7 +30,6 @@ import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
@@ -45,20 +42,18 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
 import org.apache.aurora.scheduler.mesos.Offers;
 import org.apache.aurora.scheduler.mesos.TaskExecutors;
-import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IAttribute;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.CPU;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.PORTS;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM;
-import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class SchedulingFilterImplTest extends EasyMockTest {
@@ -92,20 +87,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private static final ResourceSlot DEFAULT_OFFER = ResourceSlot.from(
       Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 80)));
 
-  private AttributeAggregate emptyJob;
-
-  private final AtomicLong taskIdCounter = new AtomicLong();
-
   private SchedulingFilter defaultFilter;
-  private AttributeStore.Mutable attributeStore;
 
   @Before
   public void setUp() {
     defaultFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
-    attributeStore = createMock(AttributeStore.Mutable.class);
-    emptyJob = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        attributeStore);
   }
 
   @Test
@@ -145,22 +131,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         none,
         defaultFilter.filter(
             new UnusedResource(twoPorts, hostA),
-            new ResourceRequest(noPortTask, TASK_ID, emptyJob)));
+            new ResourceRequest(noPortTask, TASK_ID, EMPTY)));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(twoPorts, hostA),
-            new ResourceRequest(onePortTask, TASK_ID, emptyJob)));
+            new ResourceRequest(onePortTask, TASK_ID, EMPTY)));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(twoPorts, hostA),
-            new ResourceRequest(twoPortTask, TASK_ID, emptyJob)));
+            new ResourceRequest(twoPortTask, TASK_ID, EMPTY)));
     assertEquals(
         ImmutableSet.of(PORTS.veto(1)),
         defaultFilter.filter(
             new UnusedResource(twoPorts, hostA),
-            new ResourceRequest(threePortTask, TASK_ID, emptyJob)));
+            new ResourceRequest(threePortTask, TASK_ID, EMPTY)));
   }
 
   @Test
@@ -296,38 +282,41 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertNoVetoes(hostLimitTask(2), hostAttributes(HOST_A, host(HOST_A)));
   }
 
-  private Attribute host(String host) {
+  private IAttribute host(String host) {
     return valueAttribute(HOST_ATTRIBUTE, host);
   }
 
-  private Attribute rack(String rack) {
+  private IAttribute rack(String rack) {
     return valueAttribute(RACK_ATTRIBUTE, rack);
   }
 
-  private Attribute dedicated(String value, String... values) {
+  private IAttribute dedicated(String value, String... values) {
     return valueAttribute(DEDICATED_ATTRIBUTE, value, values);
   }
 
   @Test
   public void testLimitWithinJob() throws Exception {
-    expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetHostAttributes(HOST_B, host(HOST_B), rack(RACK_A)).atLeastOnce();
-    expectGetHostAttributes(HOST_C, host(HOST_C), rack(RACK_B)).atLeastOnce();
-
-    AttributeAggregate stateA = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of(
-        makeScheduledTask(OWNER_A, JOB_A, HOST_A),
-        makeScheduledTask(OWNER_A, JOB_A, HOST_B),
-        makeScheduledTask(OWNER_A, JOB_A, HOST_B),
-        makeScheduledTask(OWNER_A, JOB_A, HOST_C))),
-        attributeStore);
-    AttributeAggregate stateB = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of(
-        makeScheduledTask(OWNER_B, JOB_A, HOST_A),
-        makeScheduledTask(OWNER_B, JOB_A, HOST_A),
-        makeScheduledTask(OWNER_B, JOB_A, HOST_B))),
-        attributeStore);
-
     control.replay();
 
+    AttributeAggregate stateA = AttributeAggregate.create(
+        Suppliers.<Iterable<IAttribute>>ofInstance(ImmutableList.of(
+            host(HOST_A),
+            rack(RACK_A),
+            host(HOST_B),
+            rack(RACK_A),
+            host(HOST_B),
+            rack(RACK_A),
+            host(HOST_C),
+            rack(RACK_B))));
+    AttributeAggregate stateB = AttributeAggregate.create(
+        Suppliers.<Iterable<IAttribute>>ofInstance(ImmutableList.of(
+            host(HOST_A),
+            rack(RACK_A),
+            host(HOST_A),
+            rack(RACK_A),
+            host(HOST_B),
+            rack(RACK_A))));
+
     IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
     IHostAttributes hostB = hostAttributes(HOST_B, host(HOST_B), rack(RACK_A));
     IHostAttributes hostC = hostAttributes(HOST_C, host(HOST_C), rack(RACK_B));
@@ -421,7 +410,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         ImmutableSet.<Veto>of(),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(task, TASK_ID, emptyJob)));
+            new ResourceRequest(task, TASK_ID, EMPTY)));
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -512,7 +501,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     return checkConstraint(
         owner,
         jobName,
-        emptyJob,
+        EMPTY,
         hostAttributes,
         constraintName,
         expected,
@@ -551,7 +540,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   private void assertNoVetoes(ITaskConfig task, IHostAttributes hostAttributes) {
-    assertVetoes(task, hostAttributes, emptyJob);
+    assertVetoes(task, hostAttributes, EMPTY);
   }
 
   private void assertNoVetoes(
@@ -563,7 +552,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   private void assertVetoes(ITaskConfig task, IHostAttributes hostAttributes, Veto... vetoes) {
-    assertVetoes(task, hostAttributes, emptyJob, vetoes);
+    assertVetoes(task, hostAttributes, EMPTY, vetoes);
   }
 
   private void assertVetoes(
@@ -582,25 +571,25 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private static IHostAttributes hostAttributes(
       String host,
       MaintenanceMode mode,
-      Attribute... attributes) {
+      IAttribute... attributes) {
 
     return IHostAttributes.build(
         new HostAttributes()
             .setHost(host)
             .setMode(mode)
-            .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build()));
+            .setAttributes(IAttribute.toBuildersSet(ImmutableSet.copyOf(attributes))));
   }
 
   private static IHostAttributes hostAttributes(
       String host,
-      Attribute... attributes) {
+      IAttribute... attributes) {
 
     return hostAttributes(host, MaintenanceMode.NONE, attributes);
   }
 
-  private Attribute valueAttribute(String name, String string, String... strings) {
-    return new Attribute(name,
-        ImmutableSet.<String>builder().add(string).addAll(Arrays.asList(strings)).build());
+  private IAttribute valueAttribute(String name, String string, String... strings) {
+    return IAttribute.build(new Attribute(name,
+        ImmutableSet.<String>builder().add(string).addAll(Arrays.asList(strings)).build()));
   }
 
   private static Constraint makeConstraint(String name, String... values) {
@@ -608,25 +597,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         TaskConstraint.value(new ValueConstraint(false, ImmutableSet.copyOf(values))));
   }
 
-  private IExpectationSetters<Optional<IHostAttributes>> expectGetHostAttributes(
-      String host,
-      Attribute... attributes) {
-
-    IHostAttributes hostAttributes = IHostAttributes.build(new HostAttributes()
-        .setHost(host)
-        .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build()));
-    return expect(attributeStore.getHostAttributes(host)).andReturn(Optional.of(hostAttributes));
-  }
-
-  private IScheduledTask makeScheduledTask(Identity owner, String jobName, String host) {
-    return IScheduledTask.build(new ScheduledTask().setAssignedTask(
-        new AssignedTask()
-            .setSlaveHost(host)
-            .setTaskId("Task-" + taskIdCounter.incrementAndGet())
-            .setTask(hostLimitTask(owner, jobName, 1 /* Max per host not used here. */)
-                .newBuilder())));
-  }
-
   private Constraint limitConstraint(String name, int value) {
     return new Constraint(name, TaskConstraint.limit(new LimitConstraint(value)));
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index aca0234..ff95c36 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -26,7 +25,6 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
@@ -34,7 +32,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
-import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.FrameworkID;
@@ -49,6 +46,7 @@ import org.apache.mesos.Protos.Value.Type;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.expect;
@@ -89,7 +87,6 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private SchedulingFilter filter;
   private MesosTaskFactory taskFactory;
   private TaskAssigner assigner;
-  private AttributeAggregate emptyJob;
 
   @Before
   public void setUp() throws Exception {
@@ -98,16 +95,13 @@ public class TaskAssignerImplTest extends EasyMockTest {
     filter = createMock(SchedulingFilter.class);
     taskFactory = createMock(MesosTaskFactory.class);
     assigner = new TaskAssignerImpl(stateManager, filter, taskFactory);
-    emptyJob = new AttributeAggregate(
-        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
-        createMock(AttributeStore.class));
   }
 
   @Test
   public void testAssignNoVetoes() {
     expect(filter.filter(
         new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
-        new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)))
+        new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY)))
         .andReturn(ImmutableSet.<Veto>of());
     expect(stateManager.assignTask(
         storeProvider,
@@ -126,14 +120,14 @@ public class TaskAssignerImplTest extends EasyMockTest {
         assigner.maybeAssign(
             storeProvider,
             OFFER,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)));
+            new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY)));
   }
 
   @Test
   public void testAssignVetoes() {
     expect(filter.filter(
         new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
-        new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)))
+        new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY)))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
 
     control.replay();
@@ -143,6 +137,6 @@ public class TaskAssignerImplTest extends EasyMockTest {
         assigner.maybeAssign(
             storeProvider,
             OFFER,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)));
+            new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY)));
   }
 }