You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/13 01:10:27 UTC

[1/2] incubator-aurora git commit: Store host attributes alongside offers to reduce number of lookups.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 2e2c4b37f -> b80e69c9b


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 7cf5d3e..4170902 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -33,6 +33,7 @@ import com.twitter.common.util.BackoffStrategy;
 import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobKey;
@@ -40,6 +41,7 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
@@ -62,7 +64,6 @@ import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskID;
@@ -72,13 +73,17 @@ import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
 import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -93,14 +98,10 @@ public class TaskSchedulerTest extends EasyMockTest {
 
   private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
 
-  private static final HostOffer OFFER_A =
-      new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
-  private static final HostOffer OFFER_B =
-      new HostOffer(Offers.makeOffer("OFFER_B", "HOST_B"), MaintenanceMode.SCHEDULED);
-  private static final HostOffer OFFER_C =
-      new HostOffer(Offers.makeOffer("OFFER_C", "HOST_C"), MaintenanceMode.DRAINING);
-  private static final HostOffer OFFER_D =
-      new HostOffer(Offers.makeOffer("OFFER_D", "HOST_D"), MaintenanceMode.DRAINED);
+  private static final HostOffer OFFER_A =  makeOffer("OFFER_A", "HOST_A", NONE);
+  private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED);
+  private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING);
+  private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED);
 
   private Storage storage;
 
@@ -150,7 +151,7 @@ public class TaskSchedulerTest extends EasyMockTest {
         capture(cacheSizeSupplier))).andReturn(stat);
 
     control.replay();
-    offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenance);
+    offerQueue = new OfferQueueImpl(driver, returnDelay, executor);
     TaskScheduler scheduler = new TaskSchedulerImpl(storage,
         stateManager,
         assigner,
@@ -173,10 +174,10 @@ public class TaskSchedulerTest extends EasyMockTest {
     return expectOfferDeclineIn(10);
   }
 
-  private Capture<Runnable> expectOfferDeclineIn(int delayMillis) {
+  private Capture<Runnable> expectOfferDeclineIn(long delayMillis) {
     expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS));
     Capture<Runnable> runnable = createCapture();
-    executor.schedule(capture(runnable), eq((long) delayMillis), eq(TimeUnit.MILLISECONDS));
+    executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS));
     expectLastCall().andReturn(createMock(ScheduledFuture.class));
     return runnable;
   }
@@ -223,8 +224,8 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A.getOffer());
-    offerQueue.addOffer(OFFER_B.getOffer());
+    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_B);
   }
 
   @Test
@@ -310,7 +311,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_A);
     changeState(task, INIT, PENDING);
     timeoutCapture.getValue().run();
     timeoutCapture2.getValue().run();
@@ -345,7 +346,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     replayAndCreateScheduler();
 
     changeState(task, INIT, PENDING);
-    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_A);
     timeoutCapture.getValue().run();
   }
 
@@ -372,7 +373,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     replayAndCreateScheduler();
 
     changeState(task, INIT, PENDING);
-    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_A);
     timeoutCapture.getValue().run();
     timeoutCapture2.getValue().run();
   }
@@ -394,7 +395,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     replayAndCreateScheduler();
 
     changeState(task, INIT, PENDING);
-    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_A);
     timeoutCapture.getValue().run();
     offerExpirationCapture.getValue().run();
     timeoutCapture2.getValue().run();
@@ -405,15 +406,16 @@ public class TaskSchedulerTest extends EasyMockTest {
     expectAnyMaintenanceCalls();
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
 
-    Offer offerAB =
-        Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build();
+    HostOffer offerAB = new HostOffer(
+        Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(),
+        IHostAttributes.build(new HostAttributes()));
 
     driver.declineOffer(OFFER_A.getOffer().getId());
-    driver.declineOffer(offerAB.getId());
+    driver.declineOffer(offerAB.getOffer().getId());
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_A);
     offerQueue.addOffer(offerAB);
     offerExpirationCapture.getValue().run();
   }
@@ -431,7 +433,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_A);
     offerQueue.launchFirst(offerAcceptor);
     offerExpirationCapture.getValue().run();
   }
@@ -439,13 +441,9 @@ public class TaskSchedulerTest extends EasyMockTest {
   @Test
   public void testBasicMaintenancePreferences() {
     expectOffer();
-    expect(maintenance.getMode("HOST_D")).andReturn(OFFER_D.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_C")).andReturn(OFFER_C.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_B")).andReturn(OFFER_B.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_A")).andReturn(OFFER_A.getMode());
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
@@ -461,10 +459,10 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_D.getOffer());
-    offerQueue.addOffer(OFFER_C.getOffer());
-    offerQueue.addOffer(OFFER_B.getOffer());
-    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_D);
+    offerQueue.addOffer(OFFER_C);
+    offerQueue.addOffer(OFFER_B);
+    offerQueue.addOffer(OFFER_A);
 
     changeState(taskA, INIT, PENDING);
     captureA.getValue().run();
@@ -476,11 +474,8 @@ public class TaskSchedulerTest extends EasyMockTest {
   @Test
   public void testChangingMaintenancePreferences() {
     expectOffer();
-    expect(maintenance.getMode("HOST_A")).andReturn(OFFER_A.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_B")).andReturn(OFFER_B.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_C")).andReturn(OFFER_C.getMode());
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
@@ -490,27 +485,28 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    HostOffer updatedOfferC =
-        new HostOffer(OFFER_C.getOffer(), MaintenanceMode.NONE);
+    HostOffer updatedOfferC = new HostOffer(
+        OFFER_C.getOffer(),
+        IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
     expect(assigner.maybeAssign(updatedOfferC, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
     driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A.getOffer());
-    offerQueue.addOffer(OFFER_B.getOffer());
-    offerQueue.addOffer(OFFER_C.getOffer());
+    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_B);
+    offerQueue.addOffer(OFFER_C);
 
     // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable
 
     // Expected order now (B), with (C, A) unschedulable
-    changeHostMaintenanceState("HOST_A", MaintenanceMode.DRAINING);
+    changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING);
     changeState(taskA, INIT, PENDING);
     captureA.getValue().run();
 
     // Expected order now (C), with (A) unschedulable and (B) already consumed
-    changeHostMaintenanceState("HOST_C", MaintenanceMode.NONE);
+    changeHostMaintenanceState(OFFER_C.getAttributes(), NONE);
     changeState(taskB, INIT, PENDING);
     captureB.getValue().run();
   }
@@ -564,10 +560,10 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A.getOffer());
-    offerQueue.addOffer(OFFER_B.getOffer());
-    offerQueue.addOffer(OFFER_C.getOffer());
-    offerQueue.addOffer(OFFER_D.getOffer());
+    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_B);
+    offerQueue.addOffer(OFFER_C);
+    offerQueue.addOffer(OFFER_D);
     changeState(jobA0, INIT, PENDING);
     changeState(jobA1, INIT, PENDING);
     changeState(jobA2, INIT, PENDING);
@@ -593,7 +589,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_A);
     changeState(task, INIT, PENDING);
     timeoutCapture.getValue().run();
 
@@ -618,11 +614,22 @@ public class TaskSchedulerTest extends EasyMockTest {
   }
 
   private void expectAnyMaintenanceCalls() {
-    expect(maintenance.getMode(isA(String.class))).andReturn(MaintenanceMode.NONE).anyTimes();
+    expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes();
   }
 
-  private void changeHostMaintenanceState(String hostName, MaintenanceMode mode) {
+  private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) {
     offerQueue.hostAttributesChanged(new PubsubEvent.HostAttributesChanged(
-        IHostAttributes.build(new HostAttributes().setHost(hostName).setMode(mode))));
+        IHostAttributes.build(attributes.newBuilder().setMode(mode))));
+  }
+
+  private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) {
+    Offer offer = Offers.makeOffer(offerId, hostName);
+    return new HostOffer(
+        offer,
+        IHostAttributes.build(new HostAttributes()
+            .setHost(hostName)
+            .setSlaveId(offer.getSlaveId().getValue())
+            .setAttributes(ImmutableSet.<Attribute>of())
+            .setMode(mode)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 0318179..94f0a17 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -19,6 +19,7 @@ import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.ResourceSlot;
@@ -27,6 +28,7 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.junit.Before;
@@ -43,8 +45,8 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
       .setDiskMb(1024));
   private static final ResourceSlot TASK_RESOURCES = ResourceSlot.from(TASK);
   private static final String TASK_ID = "taskId";
-  private static final String SLAVE = "slaveHost";
-  private static final MaintenanceMode MODE = MaintenanceMode.NONE;
+  private static final IHostAttributes ATTRIBUTES =
+      IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE));
 
   private static final Veto VETO_1 = new Veto("veto1", 1);
   private static final Veto VETO_2 = new Veto("veto2", 2);
@@ -67,21 +69,21 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   @Test
   public void testEvents() {
     Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
-    expect(delegate.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+    expect(delegate.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
     eventSink.post(new Vetoed(TASK_ID, vetoes));
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob));
+    assertEquals(vetoes, filter.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob));
   }
 
   @Test
   public void testNoVetoes() {
     Set<Veto> vetoes = ImmutableSet.of();
-    expect(delegate.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+    expect(delegate.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob));
+    assertEquals(vetoes, filter.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index bffbf83..e113eba 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -98,7 +98,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       Amount.of(DEFAULT_RAM, Data.MB),
       Amount.of(DEFAULT_DISK, Data.MB),
       0);
-  private static final MaintenanceMode DEFAULT_MODE = MaintenanceMode.NONE;
 
   private AttributeAggregate emptyJob;
 
@@ -110,9 +109,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private AttributeStore.Mutable attributeStore;
 
   @Before
-  public void setUp() throws Exception {
+  public void setUp() {
     storage = createMock(Storage.class);
-    defaultFilter = new SchedulingFilterImpl(storage);
+    defaultFilter = new SchedulingFilterImpl();
     storeProvider = createMock(StoreProvider.class);
     attributeStore = createMock(AttributeStore.Mutable.class);
     emptyJob = new AttributeAggregate(
@@ -125,11 +124,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
   }
 
-  private void expectReads() throws Exception {
+  private void expectReads() {
     expect(storage.weaklyConsistentRead(EasyMock.<Quiet<Object>>anyObject()))
         .andAnswer(new IAnswer<Object>() {
           @Override
-          public Object answer() throws Exception {
+          public Object answer() {
             Quiet<?> arg = (Quiet<?>) EasyMock.getCurrentArguments()[0];
             return arg.apply(storeProvider);
           }
@@ -137,19 +136,18 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testMeetsOffer() throws Exception {
-    expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+  public void testMeetsOffer() {
     control.replay();
 
-    assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK));
-    assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1), emptyJob);
+    IHostAttributes attributes = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
+    assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), attributes);
+    assertNoVetoes(
+        makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1),
+        attributes);
   }
 
   @Test
-  public void testSufficientPorts() throws Exception {
-    expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+  public void testSufficientPorts() {
     control.replay();
 
     ResourceSlot twoPorts = ResourceSlot.from(
@@ -172,115 +170,111 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         .setRequestedPorts(ImmutableSet.of("one", "two", "three")));
 
     Set<Veto> none = ImmutableSet.of();
+    IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
     assertEquals(none,
-        defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, noPortTask, TASK_ID, emptyJob));
+        defaultFilter.filter(twoPorts, hostA, noPortTask, TASK_ID, emptyJob));
     assertEquals(none,
-        defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, onePortTask, TASK_ID, emptyJob));
+        defaultFilter.filter(twoPorts, hostA, onePortTask, TASK_ID, emptyJob));
     assertEquals(none,
-        defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, twoPortTask, TASK_ID, emptyJob));
+        defaultFilter.filter(twoPorts, hostA, twoPortTask, TASK_ID, emptyJob));
     assertEquals(
         ImmutableSet.of(PORTS.veto(1)),
-        defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, threePortTask, TASK_ID, emptyJob));
+        defaultFilter.filter(twoPorts, hostA, threePortTask, TASK_ID, emptyJob));
   }
 
   @Test
-  public void testInsufficientResources() throws Exception {
-    expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+  public void testInsufficientResources() {
     control.replay();
 
+    IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
     assertVetoes(
         makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM + 1, DEFAULT_DISK + 1),
-        DEFAULT_MODE,
+        hostA,
         CPU.veto(1), DISK.veto(1), RAM.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), DEFAULT_MODE, CPU.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), DEFAULT_MODE, RAM.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), DEFAULT_MODE, DISK.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), hostA, CPU.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), hostA, RAM.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), hostA, DISK.veto(1));
   }
 
   @Test
-  public void testDedicatedRole() throws Exception {
-    expectGetHostAttributes(HOST_A, dedicated(ROLE_A)).anyTimes();
-
+  public void testDedicatedRole() {
     control.replay();
 
-    checkConstraint(HOST_A, DEDICATED_ATTRIBUTE, true, ROLE_A);
-    assertVetoes(makeTask(OWNER_B, JOB_B), HOST_A, DEDICATED_HOST_VETO);
+    IHostAttributes hostA = hostAttributes(HOST_A, dedicated(ROLE_A));
+    checkConstraint(hostA, DEDICATED_ATTRIBUTE, true, ROLE_A);
+    assertVetoes(makeTask(OWNER_B, JOB_B), hostA, DEDICATED_HOST_VETO);
   }
 
   @Test
-  public void testSharedDedicatedHost() throws Exception {
-    String dedicated1 = "userA/jobA";
-    String dedicated2 = "kestrel/kestrel";
-
-    expectGetHostAttributes(HOST_A, dedicated(dedicated1, dedicated2)).anyTimes();
-
+  public void testSharedDedicatedHost() {
     control.replay();
 
-    assertNoVetoes(checkConstraint(
-        new Identity().setRole("userA"),
-        "jobA",
-        HOST_A,
-        DEDICATED_ATTRIBUTE,
-        true,
-        dedicated1));
-    assertNoVetoes(checkConstraint(
-        new Identity().setRole("kestrel"),
-        "kestrel",
-        HOST_A,
-        DEDICATED_ATTRIBUTE,
-        true,
-        dedicated2));
+    String dedicated1 = "userA/jobA";
+    String dedicated2 = "kestrel/kestrel";
+    IHostAttributes hostA = hostAttributes(HOST_A, dedicated(dedicated1, dedicated2));
+    assertNoVetoes(
+        checkConstraint(
+            new Identity().setRole("userA"),
+            "jobA",
+            hostA,
+            DEDICATED_ATTRIBUTE,
+            true,
+            dedicated1),
+        hostA);
+    assertNoVetoes(
+        checkConstraint(
+            new Identity().setRole("kestrel"),
+            "kestrel",
+            hostA,
+            DEDICATED_ATTRIBUTE,
+            true,
+            dedicated2),
+        hostA);
   }
 
   @Test
-  public void testMultiValuedAttributes() throws Exception {
-    expectGetHostAttributes(HOST_A, valueAttribute("jvm", "1.0", "2.0", "3.0")).anyTimes();
-    expectGetHostAttributes(HOST_B, valueAttribute("jvm", "1.0")).anyTimes();
-
+  public void testMultiValuedAttributes() {
     control.replay();
 
-    checkConstraint(HOST_A, "jvm", true, "1.0");
-    checkConstraint(HOST_A, "jvm", false, "4.0");
+    IHostAttributes hostA = hostAttributes(HOST_A, valueAttribute("jvm", "1.0", "2.0", "3.0"));
+    checkConstraint(hostA, "jvm", true, "1.0");
+    checkConstraint(hostA, "jvm", false, "4.0");
 
-    checkConstraint(HOST_A, "jvm", true, "1.0", "2.0");
-    checkConstraint(HOST_B, "jvm", false, "2.0", "3.0");
+    checkConstraint(hostA, "jvm", true, "1.0", "2.0");
+    IHostAttributes hostB = hostAttributes(HOST_A, valueAttribute("jvm", "1.0"));
+    checkConstraint(hostB, "jvm", false, "2.0", "3.0");
   }
 
   @Test
-  public void testHostScheduledForMaintenance() throws Exception {
-    expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+  public void testHostScheduledForMaintenance() {
     control.replay();
 
-    assertNoVetoes(makeTask(), HOST_A, MaintenanceMode.SCHEDULED);
+    assertNoVetoes(
+        makeTask(),
+        hostAttributes(HOST_A, MaintenanceMode.SCHEDULED, host(HOST_A), rack(RACK_A)));
   }
 
   @Test
-  public void testHostDrainingForMaintenance() throws Exception {
-    expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+  public void testHostDrainingForMaintenance() {
     control.replay();
 
     assertVetoes(makeTask(),
-        MaintenanceMode.DRAINING,
+        hostAttributes(HOST_A, MaintenanceMode.DRAINING, host(HOST_A), rack(RACK_A)),
         ConstraintFilter.maintenanceVeto("draining"));
   }
 
   @Test
-  public void testHostDrainedForMaintenance() throws Exception {
-    expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+  public void testHostDrainedForMaintenance() {
     control.replay();
 
-    assertVetoes(makeTask(), MaintenanceMode.DRAINED, ConstraintFilter.maintenanceVeto("drained"));
+    assertVetoes(
+        makeTask(),
+        hostAttributes(HOST_A, MaintenanceMode.DRAINED, host(HOST_A), rack(RACK_A)),
+        ConstraintFilter.maintenanceVeto("drained"));
   }
 
   @Test
-  public void testMultipleTaskConstraints() throws Exception {
-    expectGetHostAttributes(HOST_A, dedicated(HOST_A), host(HOST_A));
-    expectGetHostAttributes(HOST_B, dedicated("xxx"), host(HOST_A));
-
+  public void testMultipleTaskConstraints() {
     control.replay();
 
     Constraint constraint1 = makeConstraint("host", HOST_A);
@@ -288,40 +282,36 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     assertVetoes(
         makeTask(OWNER_A, JOB_A, constraint1, constraint2),
-        HOST_A,
+        hostAttributes(HOST_A, dedicated(HOST_A), host(HOST_A)),
         mismatchVeto(DEDICATED_ATTRIBUTE));
-    assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B, DEFAULT_MODE);
+    assertNoVetoes(
+        makeTask(OWNER_B, JOB_B, constraint1, constraint2),
+        hostAttributes(HOST_B, dedicated("xxx"), host(HOST_A)));
   }
 
   @Test
-  public void testDedicatedMismatchShortCircuits() throws Exception {
+  public void testDedicatedMismatchShortCircuits() {
     // Ensures that a dedicated mismatch short-circuits other filter operations, such as
     // evaluation of limit constraints.  Reduction of task queries is the desired outcome.
 
-    expectGetHostAttributes(HOST_A, host(HOST_A));
-    expectGetHostAttributes(HOST_B, dedicated(OWNER_B.getRole() + "/" + JOB_B), host(HOST_B));
-
     control.replay();
 
     Constraint hostLimit = limitConstraint("host", 1);
     assertVetoes(
         makeTask(OWNER_A, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
-        HOST_A,
+        hostAttributes(HOST_A, host(HOST_A)),
         mismatchVeto(DEDICATED_ATTRIBUTE));
     assertVetoes(
         makeTask(OWNER_B, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
-        HOST_B,
+        hostAttributes(HOST_B, dedicated(OWNER_B.getRole() + "/" + JOB_B), host(HOST_B)),
         mismatchVeto(DEDICATED_ATTRIBUTE));
   }
 
   @Test
-  public void testUnderLimitNoTasks() throws Exception {
-    expectGetHostAttributes(HOST_A);
-    expectGetHostAttributes(HOST_A, host(HOST_A));
-
+  public void testUnderLimitNoTasks() {
     control.replay();
 
-    assertNoVetoes(hostLimitTask(2), HOST_A, DEFAULT_MODE);
+    assertNoVetoes(hostLimitTask(2), hostAttributes(HOST_A, host(HOST_A)));
   }
 
   private Attribute host(String host) {
@@ -356,90 +346,89 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A, stateA);
+    IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
+    IHostAttributes hostB = hostAttributes(HOST_B, host(HOST_B), rack(RACK_A));
+    IHostAttributes hostC = hostAttributes(HOST_C, host(HOST_C), rack(RACK_B));
+    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), hostA, stateA);
     assertVetoes(
         hostLimitTask(OWNER_A, JOB_A, 1),
-        HOST_B,
+        hostB,
         stateA,
-        DEFAULT_MODE,
         limitVeto(HOST_ATTRIBUTE));
     assertVetoes(
         hostLimitTask(OWNER_A, JOB_A, 2),
-        HOST_B,
+        hostB,
         stateA,
-        DEFAULT_MODE,
         limitVeto(HOST_ATTRIBUTE));
-    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B, stateA);
+    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), hostB, stateA);
 
     assertVetoes(
         rackLimitTask(OWNER_B, JOB_A, 2),
-        HOST_B,
+        hostB,
         stateB,
-        DEFAULT_MODE,
         limitVeto(RACK_ATTRIBUTE));
     assertVetoes(
         rackLimitTask(OWNER_B, JOB_A, 3),
-        HOST_B,
+        hostB,
         stateB,
-        DEFAULT_MODE,
         limitVeto(RACK_ATTRIBUTE));
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B, stateB);
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), hostB, stateB);
 
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C, stateB);
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), hostC, stateB);
 
     assertVetoes(
         rackLimitTask(OWNER_A, JOB_A, 1),
-        HOST_C,
+        hostC,
         stateA,
-        DEFAULT_MODE,
         limitVeto(RACK_ATTRIBUTE));
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C, stateB);
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), hostC, stateB);
   }
 
   @Test
-  public void testAttribute() throws Exception {
-    expectGetHostAttributes(HOST_A, valueAttribute("jvm", "1.0")).atLeastOnce();
-
+  public void testAttribute() {
     control.replay();
 
+    IHostAttributes hostA = hostAttributes(HOST_A, valueAttribute("jvm", "1.0"));
+
     // Matches attribute, matching value.
-    checkConstraint(HOST_A, "jvm", true, "1.0");
+    checkConstraint(hostA, "jvm", true, "1.0");
 
     // Matches attribute, different value.
-    checkConstraint(HOST_A, "jvm", false, "1.4");
+    checkConstraint(hostA, "jvm", false, "1.4");
 
     // Does not match attribute.
-    checkConstraint(HOST_A, "xxx", false, "1.4");
+    checkConstraint(hostA, "xxx", false, "1.4");
 
     // Logical 'OR' matching attribute.
-    checkConstraint(HOST_A, "jvm", false, "1.2", "1.4");
+    checkConstraint(hostA, "jvm", false, "1.2", "1.4");
 
     // Logical 'OR' not matching attribute.
-    checkConstraint(HOST_A, "xxx", false, "1.0", "1.4");
+    checkConstraint(hostA, "xxx", false, "1.0", "1.4");
   }
 
   @Test
-  public void testAttributes() throws Exception {
-    expectGetHostAttributes(HOST_A,
-        valueAttribute("jvm", "1.4", "1.6", "1.7"),
-        valueAttribute("zone", "a", "b", "c")).atLeastOnce();
-
+  public void testAttributes() {
     control.replay();
 
+    IHostAttributes hostA = hostAttributes(
+        HOST_A,
+        valueAttribute("jvm", "1.4", "1.6", "1.7"),
+        valueAttribute("zone", "a", "b", "c"));
+
     // Matches attribute, matching value.
-    checkConstraint(HOST_A, "jvm", true, "1.4");
+    checkConstraint(hostA, "jvm", true, "1.4");
 
     // Matches attribute, different value.
-    checkConstraint(HOST_A, "jvm", false, "1.0");
+    checkConstraint(hostA, "jvm", false, "1.0");
 
     // Does not match attribute.
-    checkConstraint(HOST_A, "xxx", false, "1.4");
+    checkConstraint(hostA, "xxx", false, "1.4");
 
     // Logical 'OR' with attribute and value match.
-    checkConstraint(HOST_A, "jvm", true, "1.2", "1.4");
+    checkConstraint(hostA, "jvm", true, "1.2", "1.4");
 
     // Does not match attribute.
-    checkConstraint(HOST_A, "xxx", false, "1.0", "1.4");
+    checkConstraint(hostA, "xxx", false, "1.0", "1.4");
 
     // Check that logical AND works.
     Constraint jvmConstraint = makeConstraint("jvm", "1.6");
@@ -447,12 +436,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
     assertTrue(
-        defaultFilter.filter(
-            DEFAULT_OFFER,
-            HOST_A,
-            DEFAULT_MODE,
-            task, TASK_ID,
-            emptyJob).isEmpty());
+        defaultFilter.filter(DEFAULT_OFFER, hostA, task, TASK_ID, emptyJob).isEmpty());
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -460,7 +444,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     zoneNegated.getConstraint().getValue().setNegated(true);
     assertVetoes(
         makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated),
-        HOST_A,
+        hostA,
         mismatchVeto("jvm"));
   }
 
@@ -475,22 +459,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testDuplicatedAttribute() throws Exception {
-    expectGetHostAttributes(HOST_A,
-        valueAttribute("jvm", "1.4"),
-        valueAttribute("jvm", "1.6", "1.7")).atLeastOnce();
-
+  public void testDuplicatedAttribute() {
     control.replay();
 
+    IHostAttributes hostA = hostAttributes(HOST_A,
+        valueAttribute("jvm", "1.4"),
+        valueAttribute("jvm", "1.6", "1.7"));
+
     // Matches attribute, matching value.
-    checkConstraint(HOST_A, "jvm", true, "1.4");
-    checkConstraint(HOST_A, "jvm", true, "1.6");
-    checkConstraint(HOST_A, "jvm", true, "1.7");
-    checkConstraint(HOST_A, "jvm", true, "1.6", "1.7");
+    checkConstraint(hostA, "jvm", true, "1.4");
+    checkConstraint(hostA, "jvm", true, "1.6");
+    checkConstraint(hostA, "jvm", true, "1.7");
+    checkConstraint(hostA, "jvm", true, "1.6", "1.7");
   }
 
   private ITaskConfig checkConstraint(
-      String host,
+      IHostAttributes hostAttributes,
       String constraintName,
       boolean expected,
       String value,
@@ -499,7 +483,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     return checkConstraint(
         OWNER_A,
         JOB_A,
-        host,
+        hostAttributes,
         constraintName,
         expected,
         value,
@@ -509,13 +493,19 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private ITaskConfig checkConstraint(
       Identity owner,
       String jobName,
-      String host,
+      IHostAttributes hostAttributes,
       String constraintName,
       boolean expected,
       String value,
       String... vs) {
 
-    return checkConstraint(owner, jobName, emptyJob, host, constraintName, expected,
+    return checkConstraint(
+        owner,
+        jobName,
+        emptyJob,
+        hostAttributes,
+        constraintName,
+        expected,
         new ValueConstraint(false,
             ImmutableSet.<String>builder().add(value).addAll(Arrays.asList(vs)).build()));
   }
@@ -524,7 +514,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       Identity owner,
       String jobName,
       AttributeAggregate aggregate,
-      String host,
+      IHostAttributes hostAttributes,
       String constraintName,
       boolean expected,
       ValueConstraint value) {
@@ -534,9 +524,8 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertEquals(
         expected,
         defaultFilter.filter(
-            DEFAULT_OFFER,
-            host,
-            DEFAULT_MODE,
+             DEFAULT_OFFER,
+            hostAttributes,
             task,
             TASK_ID,
             aggregate).isEmpty());
@@ -548,63 +537,57 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         !expected,
         defaultFilter.filter(
             DEFAULT_OFFER,
-            host,
-            DEFAULT_MODE,
+            hostAttributes,
             negatedTask,
             TASK_ID,
             aggregate).isEmpty());
     return task;
   }
 
-  private void assertNoVetoes(ITaskConfig task) {
-    assertNoVetoes(task, emptyJob);
-  }
-
-  private void assertNoVetoes(ITaskConfig task, AttributeAggregate jobState) {
-    assertNoVetoes(task, HOST_A, jobState);
-  }
-
-  private void assertNoVetoes(ITaskConfig task, String host, MaintenanceMode mode) {
-    assertVetoes(task, host, emptyJob, mode);
+  private void assertNoVetoes(ITaskConfig task, IHostAttributes hostAttributes) {
+    assertVetoes(task, hostAttributes, emptyJob);
   }
 
   private void assertNoVetoes(
       ITaskConfig task,
-      String host,
+      IHostAttributes attributes,
       AttributeAggregate jobState) {
-    assertVetoes(task, host, jobState, DEFAULT_MODE);
+
+    assertVetoes(task, attributes, jobState);
   }
 
-  private void assertVetoes(ITaskConfig task, MaintenanceMode mode, Veto... vetos) {
-    assertVetoes(task, emptyJob, mode, vetos);
+  private void assertVetoes(ITaskConfig task, IHostAttributes hostAttributes, Veto... vetoes) {
+    assertVetoes(task, hostAttributes, emptyJob, vetoes);
   }
 
   private void assertVetoes(
       ITaskConfig task,
+      IHostAttributes hostAttributes,
       AttributeAggregate jobState,
-      MaintenanceMode mode,
-      Veto... vetos) {
-    assertVetoes(task, HOST_A, jobState, mode, vetos);
+      Veto... vetoes) {
+
+    assertEquals(
+        ImmutableSet.copyOf(vetoes),
+        defaultFilter.filter(DEFAULT_OFFER, hostAttributes, task, TASK_ID, jobState));
   }
 
-  private void assertVetoes(
-      ITaskConfig task,
+  private static IHostAttributes hostAttributes(
       String host,
-      Veto... vetoes) {
+      MaintenanceMode mode,
+      Attribute... attributes) {
 
-    assertVetoes(task, host, emptyJob, DEFAULT_MODE, vetoes);
+    return IHostAttributes.build(
+        new HostAttributes()
+            .setHost(host)
+            .setMode(mode)
+            .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build()));
   }
 
-  private void assertVetoes(
-      ITaskConfig task,
+  private static IHostAttributes hostAttributes(
       String host,
-      AttributeAggregate jobState,
-      MaintenanceMode mode,
-      Veto... vetoes) {
+      Attribute... attributes) {
 
-    assertEquals(
-        ImmutableSet.copyOf(vetoes),
-        defaultFilter.filter(DEFAULT_OFFER, host, mode, task, TASK_ID, jobState));
+    return hostAttributes(host, MaintenanceMode.NONE, attributes);
   }
 
   private Attribute valueAttribute(String name, String string, String... strings) {
@@ -676,11 +659,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         .setExecutorConfig(new ExecutorConfig("aurora", "config"))));
   }
 
-  private ITaskConfig makeTask(int cpus, long ramMb, long diskMb) throws Exception {
+  private ITaskConfig makeTask(int cpus, long ramMb, long diskMb) {
     return makeTask(OWNER_A, JOB_A, cpus, ramMb, diskMb);
   }
 
-  private ITaskConfig makeTask() throws Exception {
+  private ITaskConfig makeTask() {
     return makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index 052562f..d02c6b3 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.testing.TearDown;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
@@ -32,8 +33,10 @@ import com.twitter.common.application.Lifecycle;
 import com.twitter.common.base.Command;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskLauncher;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
@@ -47,7 +50,6 @@ import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskID;
@@ -58,6 +60,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertTrue;
 
@@ -75,18 +79,32 @@ public class MesosSchedulerImplTest extends EasyMockTest {
       ExecutorID.newBuilder().setValue("executor-id").build();
 
   private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
-  private static final Offer OFFER = Offer.newBuilder()
-      .setFrameworkId(FRAMEWORK)
-      .setSlaveId(SLAVE_ID)
-      .setHostname(SLAVE_HOST)
-      .setId(OFFER_ID)
-      .build();
+  private static final HostOffer OFFER = new HostOffer(
+      Offer.newBuilder()
+          .setFrameworkId(FRAMEWORK)
+          .setSlaveId(SLAVE_ID)
+          .setHostname(SLAVE_HOST)
+          .setId(OFFER_ID)
+          .build(),
+      IHostAttributes.build(
+          new HostAttributes()
+              .setHost(SLAVE_HOST)
+              .setSlaveId(SLAVE_ID.getValue())
+              .setMode(NONE)
+              .setAttributes(ImmutableSet.<Attribute>of())));
   private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build();
-  private static final Offer OFFER_2 = Offer.newBuilder(OFFER)
-      .setSlaveId(SLAVE_ID_2)
-      .setHostname(SLAVE_HOST_2)
-      .setId(OFFER_ID_2)
-      .build();
+  private static final HostOffer OFFER_2 = new HostOffer(
+      Offer.newBuilder(OFFER.getOffer())
+          .setSlaveId(SLAVE_ID_2)
+          .setHostname(SLAVE_HOST_2)
+          .setId(OFFER_ID_2)
+          .build(),
+      IHostAttributes.build(
+          new HostAttributes()
+              .setHost(SLAVE_HOST_2)
+              .setSlaveId(SLAVE_ID_2.getValue())
+              .setMode(NONE)
+              .setAttributes(ImmutableSet.<Attribute>of())));
 
   private static final TaskStatus STATUS = TaskStatus.newBuilder()
       .setState(TaskState.TASK_RUNNING)
@@ -184,15 +202,17 @@ public class MesosSchedulerImplTest extends EasyMockTest {
     new OfferFixture() {
       @Override
       void respondToOffer() throws Exception {
-        IHostAttributes draining = IHostAttributes.build(new HostAttributes().setMode(DRAINING));
-        expect(storageUtil.attributeStore.getHostAttributes(OFFER.getHostname()))
+        IHostAttributes draining =
+            IHostAttributes.build(OFFER.getAttributes().newBuilder().setMode(DRAINING));
+        expect(storageUtil.attributeStore.getHostAttributes(OFFER.getOffer().getHostname()))
             .andReturn(Optional.of(draining));
         IHostAttributes saved = IHostAttributes.build(
-            Conversions.getAttributes(OFFER).newBuilder().setMode(DRAINING));
+            Conversions.getAttributes(OFFER.getOffer()).newBuilder().setMode(DRAINING));
         expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
 
-        expect(systemLauncher.willUse(OFFER)).andReturn(false);
-        expect(userLauncher.willUse(OFFER)).andReturn(true);
+        HostOffer offer = new HostOffer(OFFER.getOffer(), draining);
+        expect(systemLauncher.willUse(offer)).andReturn(false);
+        expect(userLauncher.willUse(offer)).andReturn(true);
       }
     }.run();
   }
@@ -255,7 +275,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
 
       @Override
       void test() {
-        scheduler.resourceOffers(driver, ImmutableList.of(OFFER, OFFER_2));
+        scheduler.resourceOffers(driver, ImmutableList.of(OFFER.getOffer(), OFFER_2.getOffer()));
       }
     }.run();
   }
@@ -286,11 +306,11 @@ public class MesosSchedulerImplTest extends EasyMockTest {
         "hello".getBytes(StandardCharsets.UTF_8));
   }
 
-  private void expectOfferAttributesSaved(Offer offer) {
-    expect(storageUtil.attributeStore.getHostAttributes(offer.getHostname()))
+  private void expectOfferAttributesSaved(HostOffer offer) {
+    expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname()))
         .andReturn(Optional.<IHostAttributes>absent());
     IHostAttributes defaultMode = IHostAttributes.build(
-        Conversions.getAttributes(offer).newBuilder().setMode(MaintenanceMode.NONE));
+        Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(MaintenanceMode.NONE));
     expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true);
   }
 
@@ -341,7 +361,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
 
     @Override
     void test() {
-      scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
+      scheduler.resourceOffers(driver, ImmutableList.of(OFFER.getOffer()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 563c1be..8d69ae9 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -20,9 +20,10 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -31,9 +32,9 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
 import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.SlaveID;
@@ -45,14 +46,14 @@ import org.apache.mesos.Protos.Value.Type;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class TaskAssignerImplTest extends EasyMockTest {
 
   private static final int PORT = 5000;
-  private static final Offer OFFER = Offer.newBuilder()
+  private static final Offer MESOS_OFFER = Offer.newBuilder()
       .setId(OfferID.newBuilder().setValue("offerId"))
       .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
       .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
@@ -63,7 +64,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
           .setRanges(
               Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
       .build();
-  private static final HostOffer HOST_OFFER = new HostOffer(OFFER, MaintenanceMode.NONE);
+  private static final HostOffer OFFER =
+      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
   private static final IScheduledTask TASK = IScheduledTask.build(
       new ScheduledTask()
           .setAssignedTask(new AssignedTask()
@@ -74,7 +76,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
       .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
-      .setSlaveId(OFFER.getSlaveId())
+      .setSlaveId(MESOS_OFFER.getSlaveId())
       .build();
 
   private StateManager stateManager;
@@ -97,32 +99,31 @@ public class TaskAssignerImplTest extends EasyMockTest {
   @Test
   public void testAssignNoVetoes() {
     expect(filter.filter(
-        ResourceSlot.from(OFFER),
-        OFFER.getHostname(),
-        MaintenanceMode.NONE,
+        ResourceSlot.from(MESOS_OFFER),
+        OFFER.getAttributes(),
         TASK.getAssignedTask().getTask(),
         Tasks.id(TASK),
         emptyJob))
         .andReturn(ImmutableSet.<Veto>of());
     expect(stateManager.assignTask(
         Tasks.id(TASK),
-        OFFER.getHostname(),
-        OFFER.getSlaveId(),
+        MESOS_OFFER.getHostname(),
+        MESOS_OFFER.getSlaveId(),
         ImmutableSet.of(PORT)))
         .andReturn(TASK.getAssignedTask());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getSlaveId())).andReturn(TASK_INFO);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId()))
+        .andReturn(TASK_INFO);
 
     control.replay();
 
-    assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(HOST_OFFER, TASK, emptyJob));
+    assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK, emptyJob));
   }
 
   @Test
   public void testAssignVetoes() {
     expect(filter.filter(
-        ResourceSlot.from(OFFER),
-        OFFER.getHostname(),
-        MaintenanceMode.NONE,
+        ResourceSlot.from(MESOS_OFFER),
+        OFFER.getAttributes(),
         TASK.getAssignedTask().getTask(),
         Tasks.id(TASK),
         emptyJob))
@@ -130,6 +131,6 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(HOST_OFFER, TASK, emptyJob));
+    assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK, emptyJob));
   }
 }


[2/2] incubator-aurora git commit: Store host attributes alongside offers to reduce number of lookups.

Posted by wf...@apache.org.
Store host attributes alongside offers to reduce number of lookups.

Bugs closed: AURORA-913

Reviewed at https://reviews.apache.org/r/27902/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/b80e69c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/b80e69c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/b80e69c9

Branch: refs/heads/master
Commit: b80e69c9b9d0e18b9ea1189fd310e6995e4e1fe9
Parents: 2e2c4b3
Author: Bill Farner <wf...@apache.org>
Authored: Wed Nov 12 16:10:03 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Nov 12 16:10:03 2014 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/HostOffer.java  |  66 ++++
 .../apache/aurora/scheduler/TaskLauncher.java   |   3 +-
 .../aurora/scheduler/UserTaskLauncher.java      |   3 +-
 .../scheduler/async/GcExecutorLauncher.java     |  18 +-
 .../aurora/scheduler/async/OfferQueue.java      | 125 ++-----
 .../aurora/scheduler/async/Preemptor.java       |  67 ++--
 .../async/RandomJitterReturnDelay.java          |   4 +-
 .../aurora/scheduler/async/TaskScheduler.java   |  10 +-
 .../events/NotifyingSchedulingFilter.java       |   7 +-
 .../scheduler/filter/SchedulingFilter.java      |   7 +-
 .../scheduler/filter/SchedulingFilterImpl.java  |  86 ++---
 .../apache/aurora/scheduler/http/Offers.java    |   4 +-
 .../scheduler/mesos/MesosSchedulerImpl.java     |  44 ++-
 .../scheduler/state/MaintenanceController.java  |   1 +
 .../aurora/scheduler/state/TaskAssigner.java    |  19 +-
 .../scheduler/stats/AsyncStatsModule.java       |   9 +-
 .../aurora/scheduler/UserTaskLauncherTest.java  |  13 +-
 .../scheduler/async/GcExecutorLauncherTest.java |  24 +-
 .../scheduler/async/OfferQueueImplTest.java     | 136 +++-----
 .../scheduler/async/PreemptorImplTest.java      |  53 +--
 .../scheduler/async/TaskSchedulerImplTest.java  |   9 +-
 .../scheduler/async/TaskSchedulerTest.java      | 107 +++---
 .../events/NotifyingSchedulingFilterTest.java   |  14 +-
 .../filter/SchedulingFilterImplTest.java        | 335 +++++++++----------
 .../scheduler/mesos/MesosSchedulerImplTest.java |  64 ++--
 .../scheduler/state/TaskAssignerImplTest.java   |  35 +-
 26 files changed, 625 insertions(+), 638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/HostOffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/HostOffer.java b/src/main/java/org/apache/aurora/scheduler/HostOffer.java
new file mode 100644
index 0000000..5056b60
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/HostOffer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.Objects;
+
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.mesos.Protos.Offer;
+
+/**
+ * An available resource in the cluster.
+ */
+public class HostOffer {
+  private final Offer offer;
+  private final IHostAttributes hostAttributes;
+
+  public HostOffer(Offer offer, IHostAttributes hostAttributes) {
+    this.offer = requireNonNull(offer);
+    this.hostAttributes = requireNonNull(hostAttributes);
+  }
+
+  public Offer getOffer() {
+    return offer;
+  }
+
+  public IHostAttributes getAttributes() {
+    return hostAttributes;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof HostOffer)) {
+      return false;
+    }
+    HostOffer other = (HostOffer) o;
+    return Objects.equals(offer, other.offer)
+        && Objects.equals(hostAttributes, other.hostAttributes);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(offer, hostAttributes);
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("offer", offer)
+        .add("hostAttributes", hostAttributes)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
index c13520a..cd55a6e 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler;
 
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.TaskStatus;
 
@@ -33,7 +32,7 @@ public interface TaskLauncher {
    * @return {@code false} if the launcher will not act on the offer, or {@code true} if the
    *         launcher may accept the offer at some point in the future.
    */
-  boolean willUse(Offer offer);
+  boolean willUse(HostOffer offer);
 
   /**
    * Informs the launcher that a status update has been received for a task.  If the task is not

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 250c2df..e1b7d05 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,7 +27,6 @@ import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.TaskStatus;
 
@@ -56,7 +55,7 @@ class UserTaskLauncher implements TaskLauncher {
   }
 
   @Override
-  public boolean willUse(Offer offer) {
+  public boolean willUse(HostOffer offer) {
     requireNonNull(offer);
 
     offerQueue.addOffer(offer);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index 79d8d8d..e02921d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -41,6 +41,7 @@ import org.apache.aurora.Protobufs;
 import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskLauncher;
 import org.apache.aurora.scheduler.base.CommandUtil;
 import org.apache.aurora.scheduler.base.Query;
@@ -52,7 +53,6 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskID;
@@ -192,20 +192,22 @@ public class GcExecutorLauncher implements TaskLauncher {
             Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS)));
   }
 
-  private boolean sufficientResources(Offer offer) {
-    boolean sufficient = Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES);
+  private boolean sufficientResources(HostOffer offer) {
+    boolean sufficient =
+        Resources.from(offer.getOffer()).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES);
     if (!sufficient) {
-      LOG.warning("Offer for host " + offer.getHostname() + " is too small for a GC executor");
+      LOG.warning("Offer for host " + offer.getOffer().getHostname()
+          + " is too small for a GC executor");
       insufficientOffers.incrementAndGet();
     }
     return sufficient;
   }
 
   @Override
-  public boolean willUse(final Offer offer) {
+  public boolean willUse(final HostOffer offer) {
     if (!settings.getGcExecutorPath().isPresent()
         || !sufficientResources(offer)
-        || !isTimeToCollect(offer.getHostname())) {
+        || !isTimeToCollect(offer.getOffer().getHostname())) {
 
       return false;
     }
@@ -213,7 +215,9 @@ public class GcExecutorLauncher implements TaskLauncher {
     executor.execute(new Runnable() {
       @Override
       public void run() {
-        driver.launchTask(offer.getId(), makeGcTask(offer.getHostname(), offer.getSlaveId()));
+        driver.launchTask(
+            offer.getOffer().getId(),
+            makeGcTask(offer.getOffer().getHostname(), offer.getOffer().getSlaveId()));
       }
     });
     offersConsumed.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index dd8a900..d2682cd 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
 
 import java.util.Comparator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,11 +36,11 @@ import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.mesos.Protos.Offer;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskInfo;
@@ -64,7 +63,7 @@ public interface OfferQueue extends EventSubscriber {
    *
    * @param offer Newly-available resource offer.
    */
-  void addOffer(Offer offer);
+  void addOffer(HostOffer offer);
 
   /**
    * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
@@ -104,7 +103,7 @@ public interface OfferQueue extends EventSubscriber {
    * The delay is calculated for each offer that is received, so the return delay may be
    * fixed or variable.
    */
-  interface OfferReturnDelay extends Supplier<Amount<Integer, Time>> {
+  interface OfferReturnDelay extends Supplier<Amount<Long, Time>> {
   }
 
   /**
@@ -120,51 +119,6 @@ public interface OfferQueue extends EventSubscriber {
     }
   }
 
-  /**
-   * Encapsulate an offer from a host, and the host's maintenance mode.
-   */
-  class HostOffer {
-    private final Offer offer;
-
-    // TODO(wfarner): Replace this with HostAttributes for more use of this caching.
-    private final MaintenanceMode mode;
-
-    public HostOffer(Offer offer, MaintenanceMode mode) {
-      this.offer = requireNonNull(offer);
-      this.mode = requireNonNull(mode);
-    }
-
-    public Offer getOffer() {
-      return offer;
-    }
-
-    public MaintenanceMode getMode() {
-      return mode;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof HostOffer)) {
-        return false;
-      }
-      HostOffer other = (HostOffer) o;
-      return Objects.equals(offer, other.offer) && mode == other.mode;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(offer, mode);
-    }
-
-    @Override
-    public String toString() {
-      return com.google.common.base.Objects.toStringHelper(this)
-          .add("offer", offer)
-          .add("mode", mode)
-          .toString();
-    }
-  }
-
   class OfferQueueImpl implements OfferQueue {
     private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
 
@@ -174,41 +128,36 @@ public interface OfferQueue extends EventSubscriber {
     private final Driver driver;
     private final OfferReturnDelay returnDelay;
     private final ScheduledExecutorService executor;
-    private final MaintenanceController maintenance;
 
     @Inject
-    OfferQueueImpl(Driver driver,
-        OfferReturnDelay returnDelay,
-        ScheduledExecutorService executor,
-        MaintenanceController maintenance) {
-
-      this.driver = driver;
-      this.returnDelay = returnDelay;
-      this.executor = executor;
-      this.maintenance = maintenance;
+    OfferQueueImpl(Driver driver, OfferReturnDelay returnDelay, ScheduledExecutorService executor) {
+      this.driver = requireNonNull(driver);
+      this.returnDelay = requireNonNull(returnDelay);
+      this.executor = requireNonNull(executor);
     }
 
     @Override
-    public void addOffer(final Offer offer) {
+    public void addOffer(final HostOffer offer) {
       // We run a slight risk of a race here, which is acceptable.  The worst case is that we
       // temporarily hold two offers for the same host, which should be corrected when we return
       // them after the return delay.
       // There's also a chance that we return an offer for compaction ~simultaneously with the
       // same-host offer being canceled/returned.  This is also fine.
-      Optional<HostOffer> sameSlave = hostOffers.get(offer.getSlaveId());
+      Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getSlaveId());
       if (sameSlave.isPresent()) {
         // If there are existing offers for the slave, decline all of them so the master can
         // compact all of those offers into a single offer and send them back.
-        LOG.info("Returning offers for " + offer.getSlaveId().getValue() + " for compaction.");
-        decline(offer.getId());
-        removeAndDecline(sameSlave.get().offer.getId());
+        LOG.info("Returning offers for " + offer.getOffer().getSlaveId().getValue()
+            + " for compaction.");
+        decline(offer.getOffer().getId());
+        removeAndDecline(sameSlave.get().getOffer().getId());
       } else {
-        hostOffers.add(new HostOffer(offer, maintenance.getMode(offer.getHostname())));
+        hostOffers.add(offer);
         executor.schedule(
             new Runnable() {
               @Override
               public void run() {
-                removeAndDecline(offer.getId());
+                removeAndDecline(offer.getOffer().getId());
               }
             },
             returnDelay.get().as(Time.MILLISECONDS),
@@ -252,7 +201,7 @@ public interface OfferQueue extends EventSubscriber {
      */
     @Subscribe
     public void hostAttributesChanged(HostAttributesChanged change) {
-      hostOffers.updateHostMode(change.getAttributes().getHost(), change.getAttributes().getMode());
+      hostOffers.updateHostAttributes(change.getAttributes());
     }
 
     /**
@@ -280,12 +229,12 @@ public interface OfferQueue extends EventSubscriber {
               .onResultOf(new Function<HostOffer, MaintenanceMode>() {
                 @Override
                 public MaintenanceMode apply(HostOffer offer) {
-                  return offer.mode;
+                  return offer.getAttributes().getMode();
                 }
               })
               .compound(Ordering.arbitrary());
 
-      private final Set<HostOffer> hostOffers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
+      private final Set<HostOffer> offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
       private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
       private final Map<SlaveID, HostOffer> offersBySlave = Maps.newHashMap();
       private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
@@ -293,7 +242,7 @@ public interface OfferQueue extends EventSubscriber {
       HostOffers() {
         // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
         // Could track this separately if it turns out to pose problems.
-        Stats.exportSize("outstanding_offers", hostOffers);
+        Stats.exportSize("outstanding_offers", offers);
       }
 
       synchronized Optional<HostOffer> get(SlaveID slaveId) {
@@ -301,37 +250,37 @@ public interface OfferQueue extends EventSubscriber {
       }
 
       synchronized void add(HostOffer offer) {
-        hostOffers.add(offer);
-        offersById.put(offer.offer.getId(), offer);
-        offersBySlave.put(offer.offer.getSlaveId(), offer);
-        offersByHost.put(offer.offer.getHostname(), offer);
+        offers.add(offer);
+        offersById.put(offer.getOffer().getId(), offer);
+        offersBySlave.put(offer.getOffer().getSlaveId(), offer);
+        offersByHost.put(offer.getOffer().getHostname(), offer);
       }
 
       synchronized boolean remove(OfferID id) {
         HostOffer removed = offersById.remove(id);
         if (removed != null) {
-          hostOffers.remove(removed);
-          offersBySlave.remove(removed.offer.getSlaveId());
-          offersByHost.remove(removed.offer.getHostname());
+          offers.remove(removed);
+          offersBySlave.remove(removed.getOffer().getSlaveId());
+          offersByHost.remove(removed.getOffer().getHostname());
         }
         return removed != null;
       }
 
-      synchronized void updateHostMode(String hostName, MaintenanceMode mode) {
-        HostOffer offer = offersByHost.remove(hostName);
+      synchronized void updateHostAttributes(IHostAttributes attributes) {
+        HostOffer offer = offersByHost.remove(attributes.getHost());
         if (offer != null) {
           // Remove and re-add a host's offer to re-sort based on its new hostStatus
-          remove(offer.offer.getId());
-          add(new HostOffer(offer.offer, mode));
+          remove(offer.getOffer().getId());
+          add(new HostOffer(offer.getOffer(),  attributes));
         }
       }
 
       synchronized Iterable<HostOffer> getWeaklyConsistentOffers() {
-        return Iterables.unmodifiableIterable(hostOffers);
+        return Iterables.unmodifiableIterable(offers);
       }
 
       synchronized void clear() {
-        hostOffers.clear();
+        offers.clear();
         offersById.clear();
         offersBySlave.clear();
         offersByHost.clear();
@@ -345,17 +294,17 @@ public interface OfferQueue extends EventSubscriber {
       // It's important that this method is not called concurrently - doing so would open up the
       // possibility of a race between the same offers being accepted by different threads.
 
-      for (HostOffer hostOffer : hostOffers.getWeaklyConsistentOffers()) {
-        Optional<TaskInfo> assignment = acceptor.apply(hostOffer);
+      for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
+        Optional<TaskInfo> assignment = acceptor.apply(offer);
         if (assignment.isPresent()) {
           // Guard against an offer being removed after we grabbed it from the iterator.
           // If that happens, the offer will not exist in hostOffers, and we can immediately
           // send it back to LOST for quick reschedule.
           // Removing while iterating counts on the use of a weakly-consistent iterator being used,
           // which is a feature of ConcurrentSkipListSet.
-          if (hostOffers.remove(hostOffer.offer.getId())) {
+          if (hostOffers.remove(offer.getOffer().getId())) {
             try {
-              driver.launchTask(hostOffer.offer.getId(), assignment.get());
+              driver.launchTask(offer.getOffer().getId(), assignment.get());
               return true;
             } catch (IllegalStateException e) {
               // TODO(William Farner): Catch only the checked exception produced by Driver

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index a17738e..1d337f6 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -40,19 +40,20 @@ import com.google.common.collect.Sets;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static java.lang.annotation.ElementType.FIELD;
@@ -63,8 +64,9 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import static org.apache.aurora.scheduler.storage.Storage.Work;
 
 /**
  * Preempts active tasks in favor of higher priority tasks.
@@ -130,7 +132,7 @@ public interface Preemptor {
     private final SchedulingFilter schedulingFilter;
     private final Amount<Long, Time> preemptionCandidacyDelay;
     private final Clock clock;
-    private final MaintenanceController maintenance;
+    private final AtomicLong missingAttributes;
 
     /**
      * Creates a new preemptor.
@@ -151,7 +153,7 @@ public interface Preemptor {
         SchedulingFilter schedulingFilter,
         @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
         Clock clock,
-        MaintenanceController maintenance) {
+        StatsProvider statsProvider) {
 
       this.storage = requireNonNull(storage);
       this.stateManager = requireNonNull(stateManager);
@@ -159,7 +161,7 @@ public interface Preemptor {
       this.schedulingFilter = requireNonNull(schedulingFilter);
       this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
       this.clock = requireNonNull(clock);
-      this.maintenance = requireNonNull(maintenance);
+      missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes");
     }
 
     private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
@@ -200,24 +202,24 @@ public interface Preemptor {
     private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
         new Function<HostOffer, ResourceSlot>() {
           @Override
-          public ResourceSlot apply(HostOffer hostOffer) {
-            return ResourceSlot.from(hostOffer.getOffer());
+          public ResourceSlot apply(HostOffer offer) {
+            return ResourceSlot.from(offer.getOffer());
           }
         };
 
     private static final Function<HostOffer, String> OFFER_TO_HOST =
         new Function<HostOffer, String>() {
           @Override
-          public String apply(HostOffer hostOffer) {
-            return hostOffer.getOffer().getHostname();
+          public String apply(HostOffer offer) {
+            return offer.getOffer().getHostname();
           }
         };
 
-    private static final Function<HostOffer, MaintenanceMode> OFFER_TO_MODE =
-        new Function<HostOffer, MaintenanceMode>() {
+    private static final Function<HostOffer, IHostAttributes> OFFER_TO_ATTRIBUTES =
+        new Function<HostOffer, IHostAttributes>() {
           @Override
-          public MaintenanceMode apply(HostOffer hostOffer) {
-            return hostOffer.getMode();
+          public IHostAttributes apply(HostOffer offer) {
+            return offer.getAttributes();
           }
         };
 
@@ -255,18 +257,17 @@ public interface Preemptor {
           // us.
           return Optional.absent();
         }
-        MaintenanceMode mode =
-            Iterables.getOnlyElement(FluentIterable.from(offers).transform(OFFER_TO_MODE).toSet());
+        IHostAttributes attributes = Iterables.getOnlyElement(
+            FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
 
-        Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+        Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
             slackResources,
-            host,
-            mode,
+            attributes,
             pendingTask.getTask(),
             pendingTask.getTaskId(),
             attributeAggregate);
 
-        if (vetos.isEmpty()) {
+        if (vetoes.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
         }
       }
@@ -289,26 +290,40 @@ public interface Preemptor {
             ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
             slackResources);
 
-        Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+        Optional<IHostAttributes> attributes = getHostAttributes(host);
+        if (!attributes.isPresent()) {
+          missingAttributes.incrementAndGet();
+          continue;
+        }
+
+        Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
             totalResource,
-            host,
-            maintenance.getMode(host),
+            attributes.get(),
             pendingTask.getTask(),
             pendingTask.getTaskId(),
             attributeAggregate);
 
-        if (vetos.isEmpty()) {
+        if (vetoes.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
         }
       }
       return Optional.absent();
     }
 
+    private Optional<IHostAttributes> getHostAttributes(final String host) {
+      return storage.weaklyConsistentRead(new Work.Quiet<Optional<IHostAttributes>>() {
+        @Override
+        public Optional<IHostAttributes> apply(StoreProvider storeProvider) {
+          return storeProvider.getAttributeStore().getHostAttributes(host);
+        }
+      });
+    }
+
     private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
         new Function<HostOffer, String>() {
           @Override
-          public String apply(HostOffer hostOffer) {
-            return hostOffer.getOffer().getSlaveId().getValue();
+          public String apply(HostOffer offer) {
+            return offer.getOffer().getSlaveId().getValue();
           }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
index d15d9e6..2accb4e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
@@ -43,7 +43,7 @@ class RandomJitterReturnDelay implements OfferReturnDelay {
   }
 
   @Override
-  public Amount<Integer, Time> get() {
-    return Amount.of(minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS);
+  public Amount<Long, Time> get() {
+    return Amount.of((long) minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index b23457e..e2ba8b8 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -41,6 +41,7 @@ import com.twitter.common.stats.Stats;
 import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -65,7 +66,6 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 
 /**
  * Enables scheduling and preemption of tasks.
@@ -134,20 +134,20 @@ public interface TaskScheduler extends EventSubscriber {
 
       return new Function<HostOffer, Optional<TaskInfo>>() {
         @Override
-        public Optional<TaskInfo> apply(HostOffer hostOffer) {
+        public Optional<TaskInfo> apply(HostOffer offer) {
           Optional<String> reservedTaskId =
-              reservations.getSlaveReservation(hostOffer.getOffer().getSlaveId());
+              reservations.getSlaveReservation(offer.getOffer().getSlaveId());
           if (reservedTaskId.isPresent()) {
             if (taskId.equals(reservedTaskId.get())) {
               // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(hostOffer, task, attributeAggregate);
+              return assigner.maybeAssign(offer, task, attributeAggregate);
             } else {
               // Slave is reserved for another task.
               return Optional.absent();
             }
           } else {
             // Slave is not reserved.
-            return assigner.maybeAssign(hostOffer, task, attributeAggregate);
+            return assigner.maybeAssign(offer, task, attributeAggregate);
           }
         }
       };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index fc17cac..ca53303 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -20,11 +20,11 @@ import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.lang.annotation.ElementType.FIELD;
@@ -60,13 +60,12 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   @Override
   public Set<Veto> filter(
       ResourceSlot offer,
-      String slaveHost,
-      MaintenanceMode mode,
+      IHostAttributes hostAttributes,
       ITaskConfig task,
       String taskId,
       AttributeAggregate jobState) {
 
-    Set<Veto> vetoes = delegate.filter(offer, slaveHost, mode, task, taskId, jobState);
+    Set<Veto> vetoes = delegate.filter(offer, hostAttributes, task, taskId, jobState);
     if (!vetoes.isEmpty()) {
       eventSink.post(new Vetoed(taskId, vetoes));
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index c37272c..c1c5f26 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -18,8 +18,8 @@ import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 /**
@@ -105,8 +105,6 @@ public interface SchedulingFilter {
    * Applies a task against the filter with the given resources, and on the host.
    *
    * @param offer Resources offered.
-   * @param slaveHost Host that the resources are associated with.
-   * @param mode Maintenance mode of the host that the resources are associated with.
    * @param task Task.
    * @param taskId Canonical ID of the task.
    * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
@@ -115,8 +113,7 @@ public interface SchedulingFilter {
    */
   Set<Veto> filter(
       ResourceSlot offer,
-      String slaveHost,
-      MaintenanceMode mode,
+      IHostAttributes attributes,
       ITaskConfig task,
       String taskId,
       AttributeAggregate attributeAggregate);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 0533baa..cc6b53b 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -17,8 +17,6 @@ import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.Set;
 
-import javax.inject.Inject;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
@@ -33,16 +31,10 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
-import org.apache.aurora.scheduler.storage.entities.IAttribute;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
-import static java.util.Objects.requireNonNull;
-
 import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
@@ -65,18 +57,6 @@ public class SchedulingFilterImpl implements SchedulingFilter {
 
   private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED);
 
-  private final Storage storage;
-
-  /**
-   * Creates a new scheduling filter.
-   *
-   * @param storage Interface to accessing the task store.
-   */
-  @Inject
-  public SchedulingFilterImpl(Storage storage) {
-    this.storage = requireNonNull(storage);
-  }
-
   /**
    * A function that may veto a task.
    */
@@ -187,7 +167,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
 
   private FilterRule getConstraintFilter(
       final AttributeAggregate jobState,
-      final String slaveHost) {
+      final IHostAttributes offerAttributes) {
 
     return new FilterRule() {
       @Override
@@ -196,32 +176,23 @@ public class SchedulingFilterImpl implements SchedulingFilter {
           return ImmutableList.of();
         }
 
-        // In the interest of performance, we perform a weakly consistent read here.  The biggest
-        // risk of this is that we might schedule against stale host attributes, or we might fail
-        // to correctly satisfy a diversity constraint.  Given that the likelihood is relatively low
-        // for both of these, and the impact is also low, the weak consistency is acceptable.
-        return storage.weaklyConsistentRead(new Quiet<Iterable<Veto>>() {
-          @Override
-          public Iterable<Veto> apply(final StoreProvider storeProvider) {
-            ConstraintFilter constraintFilter = new ConstraintFilter(
-                jobState,
-                AttributeStore.Util.attributesOrNone(storeProvider, slaveHost));
-            ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
-            for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
-              Optional<Veto> veto = constraintFilter.getVeto(constraint);
-              if (veto.isPresent()) {
-                vetoes.add(veto.get());
-                if (isValueConstraint(constraint)) {
-                  // Break when a value constraint mismatch is found to avoid other
-                  // potentially-expensive operations to satisfy other constraints.
-                  break;
-                }
-              }
+        ConstraintFilter constraintFilter = new ConstraintFilter(
+            jobState,
+            offerAttributes.getAttributes());
+        ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
+        for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
+          Optional<Veto> veto = constraintFilter.getVeto(constraint);
+          if (veto.isPresent()) {
+            vetoes.add(veto.get());
+            if (isValueConstraint(constraint)) {
+              // Break when a value constraint mismatch is found to avoid other
+              // potentially-expensive operations to satisfy other constraints.
+              break;
             }
-
-            return vetoes.build();
           }
-        });
+        }
+
+        return vetoes.build();
       }
     };
   }
@@ -240,38 +211,31 @@ public class SchedulingFilterImpl implements SchedulingFilter {
     return builder.build();
   }
 
-  private boolean isDedicated(final String slaveHost) {
-    Iterable<IAttribute> slaveAttributes =
-        storage.weaklyConsistentRead(new Quiet<Iterable<IAttribute>>() {
-          @Override
-          public Iterable<IAttribute> apply(final StoreProvider storeProvider) {
-            return AttributeStore.Util.attributesOrNone(storeProvider, slaveHost);
-          }
-        });
-
-    return Iterables.any(slaveAttributes, new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
+  private boolean isDedicated(IHostAttributes attributes) {
+    return Iterables.any(
+        attributes.getAttributes(),
+        new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
   }
 
   @Override
   public Set<Veto> filter(
       ResourceSlot offer,
-      String slaveHost,
-      MaintenanceMode mode,
+      IHostAttributes attributes,
       ITaskConfig task,
       String taskId,
       AttributeAggregate attributeAggregate) {
 
-    if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
+    if (!ConfigurationManager.isDedicated(task) && isDedicated(attributes)) {
       return ImmutableSet.of(DEDICATED_HOST_VETO);
     }
 
-    Optional<Veto> maintenanceVeto = getMaintenanceVeto(mode);
+    Optional<Veto> maintenanceVeto = getMaintenanceVeto(attributes.getMode());
     if (maintenanceVeto.isPresent()) {
       return maintenanceVeto.asSet();
     }
 
     return ImmutableSet.<Veto>builder()
-        .addAll(getConstraintFilter(attributeAggregate, slaveHost).apply(task))
+        .addAll(getConstraintFilter(attributeAggregate, attributes).apply(task))
         .addAll(getResourceVetoes(offer, task))
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index 446dc74..6d75c3a 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -27,14 +27,14 @@ import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.mesos.Protos.Attribute;
 import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.Value.Range;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
 
 /**
  * Servlet that exposes resource offers that the scheduler is currently retaining.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index ffcbc97..ffc30bb 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -30,6 +30,7 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.GuiceUtils.AllowUnchecked;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskLauncher;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -39,10 +40,10 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskStatus;
@@ -55,6 +56,8 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.mesos.Protos.Offer;
+
 /**
  * Location for communication with mesos.
  */
@@ -90,7 +93,7 @@ class MesosSchedulerImpl implements Scheduler {
    * @param storage Store to save host attributes into.
    * @param lifecycle Application lifecycle manager.
    * @param taskLaunchers Task launchers, which will be used in order.  Calls to
-   *                      {@link TaskLauncher#willUse(Offer)} and
+   *                      {@link TaskLauncher#willUse(HostOffer)} and
    *                      {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
    *                      launchers, ceasing after the first match (based on a return value of
    *                      {@code true}.
@@ -153,37 +156,30 @@ class MesosSchedulerImpl implements Scheduler {
   public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
     Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
 
-    // Store all host attributes in a single write operation to prevent other threads from
-    // securing the storage lock between saves.  We also save the host attributes before passing
-    // offers elsewhere to ensure that host attributes are available before attempting to
-    // schedule tasks associated with offers.
-    // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
-    //                offers when the host attributes cannot be found. (AURORA-137)
-
     executor.execute(new Runnable() {
       @Override
       public void run() {
+        // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
+        //                offers when the host attributes cannot be found. (AURORA-137)
         storage.write(new MutateWork.NoResult.Quiet() {
           @Override
           protected void execute(MutableStoreProvider storeProvider) {
-            for (final Offer offer : offers) {
-              storeProvider.getAttributeStore().saveHostAttributes(
-                  AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer));
+            for (Offer offer : offers) {
+              IHostAttributes attributes =
+                  AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer);
+              storeProvider.getAttributeStore().saveHostAttributes(attributes);
+              if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, String.format("Received offer: %s", offer));
+              }
+              totalResourceOffers.incrementAndGet();
+              for (TaskLauncher launcher : taskLaunchers) {
+                if (launcher.willUse(new HostOffer(offer, attributes))) {
+                  break;
+                }
+              }
             }
           }
         });
-
-        for (Offer offer : offers) {
-          if (LOG.isLoggable(Level.FINE)) {
-            LOG.log(Level.FINE, String.format("Received offer: %s", offer));
-          }
-          totalResourceOffers.incrementAndGet();
-          for (TaskLauncher launcher : taskLaunchers) {
-            if (launcher.willUse(offer)) {
-              break;
-            }
-          }
-        }
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 077699f..86440eb 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -52,6 +52,7 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
  * All state-changing functions return their results.  Additionally, all state-changing functions
  * will ignore requests to change state of unknown hosts and subsequently omit these hosts from
  * return values.
+ * TODO(wfarner): Convert use of HostStatus in this API to IHostStatus (immutable).
  */
 public interface MaintenanceController {
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 9c9b659..77db411 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -20,6 +20,7 @@ import javax.inject.Inject;
 
 import com.google.common.base.Optional;
 
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
@@ -29,12 +30,11 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.TaskInfo;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
 
 /**
  * Responsible for matching a task against an offer.
@@ -45,13 +45,13 @@ public interface TaskAssigner {
    * Tries to match a task against an offer.  If a match is found, the assigner should
    * make the appropriate changes to the task and provide a non-empty result.
    *
-   * @param hostOffer The resource offer.
+   * @param offer The resource offer.
    * @param task The task to match against and optionally assign.
    * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
    * @return Instructions for launching the task if matching and assignment were successful.
    */
   Optional<TaskInfo> maybeAssign(
-      HostOffer hostOffer,
+      HostOffer offer,
       IScheduledTask task,
       AttributeAggregate attributeAggregate);
 
@@ -89,21 +89,20 @@ public interface TaskAssigner {
 
     @Override
     public Optional<TaskInfo> maybeAssign(
-        HostOffer hostOffer,
+        HostOffer offer,
         IScheduledTask task,
         AttributeAggregate attributeAggregate) {
 
       Set<Veto> vetoes = filter.filter(
-          ResourceSlot.from(hostOffer.getOffer()),
-          hostOffer.getOffer().getHostname(),
-          hostOffer.getMode(),
+          ResourceSlot.from(offer.getOffer()),
+          offer.getAttributes(),
           task.getAssignedTask().getTask(),
           Tasks.id(task),
           attributeAggregate);
       if (vetoes.isEmpty()) {
-        return Optional.of(assign(hostOffer.getOffer(), task));
+        return Optional.of(assign(offer.getOffer(), task));
       } else {
-        LOG.fine("Slave " + hostOffer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
+        LOG.fine("Slave " + offer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
             + ": " + vetoes);
         return Optional.absent();
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 844a38a..1c9904c 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -35,6 +35,7 @@ import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.base.Conversions;
@@ -49,8 +50,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
-
 /**
  * Module to configure export of cluster-wide resource allocation and consumption statistics.
  */
@@ -115,13 +114,13 @@ public class AsyncStatsModule extends AbstractModule {
     private static final Function<HostOffer, MachineResource> TO_RESOURCE =
         new Function<HostOffer, MachineResource>() {
           @Override
-          public MachineResource apply(HostOffer hostOffer) {
-            Resources resources = Resources.from(hostOffer.getOffer());
+          public MachineResource apply(HostOffer offer) {
+            Resources resources = Resources.from(offer.getOffer());
             IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
                 .setNumCpus(resources.getNumCpus())
                 .setRamMb(resources.getRam().as(Data.MB))
                 .setDiskMb(resources.getDisk().as(Data.MB)));
-            return new MachineResource(quota, Conversions.isDedicated(hostOffer.getOffer()));
+            return new MachineResource(quota, Conversions.isDedicated(offer.getOffer()));
           }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index 083a635..4673e80 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -22,14 +22,15 @@ import com.google.common.collect.Iterables;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos.Attribute;
 import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.SlaveID;
@@ -47,6 +48,7 @@ import org.junit.Test;
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.HOST_CONSTRAINT;
+import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertTrue;
 
@@ -60,7 +62,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
   private static final String TASK_ID_A = "task_id_a";
 
   private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("OfferId").build();
-  private static final Offer OFFER = createOffer(SLAVE_ID, SLAVE_HOST_1, 4, 1024, 1024);
+  private static final HostOffer OFFER = createOffer(SLAVE_ID, SLAVE_HOST_1, 4, 1024, 1024);
 
   private OfferQueue offerQueue;
   private StateManager stateManager;
@@ -171,13 +173,13 @@ public class UserTaskLauncherTest extends EasyMockTest {
     launcher.statusUpdate(status);
   }
 
-  private static Offer createOffer(SlaveID slave, String slaveHost, double cpu,
+  private static HostOffer createOffer(SlaveID slave, String slaveHost, double cpu,
       double ramMb, double diskMb) {
     return createOffer(slave, slaveHost, cpu, ramMb, diskMb,
         ImmutableSet.<Pair<Integer, Integer>>of());
   }
 
-  private static Offer createOffer(SlaveID slave, String slaveHost, double cpu,
+  private static HostOffer createOffer(SlaveID slave, String slaveHost, double cpu,
       double ramMb, double diskMb, Set<Pair<Integer, Integer>> ports) {
 
     Ranges portRanges = Ranges.newBuilder()
@@ -189,7 +191,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
         }))
         .build();
 
-    return Offer.newBuilder()
+    Offer mesosOffer = Offer.newBuilder()
         .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.CPUS)
             .setScalar(Scalar.newBuilder().setValue(cpu)))
         .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.RAM_MB)
@@ -206,5 +208,6 @@ public class UserTaskLauncherTest extends EasyMockTest {
         .setFrameworkId(FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build())
         .setId(OFFER_ID)
         .build();
+    return new HostOffer(mesosOffer, IHostAttributes.build(new HostAttributes()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
index 758a8d4..422d5a9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
@@ -30,20 +30,22 @@ import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.SlaveID;
@@ -58,6 +60,7 @@ import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.scheduler.async.GcExecutorLauncher.INSUFFICIENT_OFFERS_STAT_NAME;
 import static org.apache.aurora.scheduler.async.GcExecutorLauncher.LOST_TASKS_STAT_NAME;
 import static org.apache.aurora.scheduler.async.GcExecutorLauncher.SYSTEM_TASK_PREFIX;
+import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -67,13 +70,15 @@ public class GcExecutorLauncherTest extends EasyMockTest {
 
   private static final String HOST = "slave-host";
 
-  private static final Offer OFFER = Offer.newBuilder()
+  private static final Offer MESOS_OFFER = Offer.newBuilder()
       .setSlaveId(SlaveID.newBuilder().setValue("slave-id"))
       .setHostname(HOST)
       .setFrameworkId(FrameworkID.newBuilder().setValue("framework-id").build())
       .setId(OfferID.newBuilder().setValue("offer-id"))
       .addAllResources(GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES.toResourceList())
       .build();
+  private static final HostOffer OFFER =
+      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
 
   private static final String JOB_A = "jobA";
   private static final String TASK_UUID = "gc";
@@ -166,18 +171,19 @@ public class GcExecutorLauncherTest extends EasyMockTest {
         Resources.subtract(
             GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES,
             GcExecutorLauncher.EPSILON).toResourceList();
-    Offer smallOffer = OFFER.toBuilder()
+    Offer smallOffer = MESOS_OFFER.toBuilder()
         .clearResources()
         .addAllResources(resources)
         .build();
     assertEquals(0, insufficientOffers.get());
-    assertFalse(gcExecutorLauncher.willUse(smallOffer));
+    assertFalse(gcExecutorLauncher.willUse(
+        new HostOffer(smallOffer, IHostAttributes.build(new HostAttributes()))));
     assertEquals(1, insufficientOffers.get());
   }
 
   private static TaskStatus makeStatus(String taskId) {
     return TaskStatus.newBuilder()
-        .setSlaveId(OFFER.getSlaveId())
+        .setSlaveId(OFFER.getOffer().getSlaveId())
         .setState(TaskState.TASK_RUNNING)
         .setTaskId(TaskID.newBuilder().setValue(taskId))
         .build();
@@ -220,8 +226,12 @@ public class GcExecutorLauncherTest extends EasyMockTest {
         Maps.transformValues(Tasks.mapById(ImmutableSet.copyOf(tasks)), Tasks.GET_STATUS);
     AdjustRetainedTasks message = new AdjustRetainedTasks().setRetainedTasks(statuses);
     TaskInfo task = GcExecutorLauncher.makeGcTask(
-        HOST, OFFER.getSlaveId(), SETTINGS.getGcExecutorPath().get(), TASK_UUID, message);
-    driver.launchTask(OFFER.getId(), task);
+        HOST,
+        OFFER.getOffer().getSlaveId(),
+        SETTINGS.getGcExecutorPath().get(),
+        TASK_UUID,
+        message);
+    driver.launchTask(OFFER.getOffer().getId(), task);
   }
 
   private IScheduledTask makeTask(String jobName, ScheduleStatus status) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
index e2a198a..4cf602a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
@@ -13,154 +13,124 @@
  */
 package org.apache.aurora.scheduler.async;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
 import com.google.common.testing.TearDown;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.async.OfferQueue.LaunchException;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.mesos.Protos.Offer;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class OfferQueueImplTest extends EasyMockTest {
 
-  private static final Amount<Integer, Time> RETURN_DELAY = Amount.of(1, Time.DAYS);
+  private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS);
   private static final String HOST_A = "HOST_A";
-  private static final Offer OFFER_A = Offers.makeOffer("OFFER_A", HOST_A);
+  private static final HostOffer OFFER_A = new HostOffer(
+      Offers.makeOffer("OFFER_A", HOST_A),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
   private static final String HOST_B = "HOST_B";
-  private static final Offer OFFER_B = Offers.makeOffer("OFFER_B", HOST_B);
+  private static final HostOffer OFFER_B = new HostOffer(
+      Offers.makeOffer("OFFER_B", HOST_B),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
   private static final String HOST_C = "HOST_C";
-  private static final Offer OFFER_C = Offers.makeOffer("OFFER_C", HOST_C);
+  private static final HostOffer OFFER_C = new HostOffer(
+      Offers.makeOffer("OFFER_C", HOST_C),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
 
   private Driver driver;
-  private ScheduledExecutorService executor;
-  private ExecutorService testExecutor;
-  private MaintenanceController maintenanceController;
+  private FakeScheduledExecutor clock;
   private Function<HostOffer, Optional<TaskInfo>> offerAcceptor;
   private OfferQueueImpl offerQueue;
 
   @Before
   public void setUp() {
     driver = createMock(Driver.class);
-    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
-    executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
-    testExecutor = Executors.newCachedThreadPool(threadFactory);
+    ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
+    clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
+
     addTearDown(new TearDown() {
       @Override
       public void tearDown() throws Exception {
-        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
-        new ExecutorServiceShutdown(testExecutor, Amount.of(1L, Time.SECONDS)).execute();
+        clock.assertEmpty();
       }
     });
-    maintenanceController = createMock(MaintenanceController.class);
     offerAcceptor = createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>() { });
     OfferReturnDelay returnDelay = new OfferReturnDelay() {
       @Override
-      public Amount<Integer, Time> get() {
+      public Amount<Long, Time> get() {
         return RETURN_DELAY;
       }
     };
-    offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenanceController);
+    offerQueue = new OfferQueueImpl(driver, returnDelay, executorMock);
   }
 
   @Test
-  public void testNoDeadlock() throws Exception {
-    // Test that a blocked call to maintenanceController does not result in a deadlock between
-    // the intrinsic lock and the storage lock.
-    final CountDownLatch launchAttempted = new CountDownLatch(1);
-    expect(maintenanceController.getMode(HOST_A)).andAnswer(new IAnswer<MaintenanceMode>() {
-      @Override
-      public MaintenanceMode answer() throws InterruptedException {
-        launchAttempted.await();
-        return MaintenanceMode.NONE;
-      }
-    });
-
-    control.replay();
+  public void testOffersSorted() throws Exception {
+    // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last.
 
-    final CountDownLatch offerAdded = new CountDownLatch(1);
-    testExecutor.submit(new Runnable() {
-      @Override
-      public void run() {
-        offerQueue.addOffer(OFFER_A);
-        offerAdded.countDown();
-      }
-    });
-    testExecutor.submit(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          offerQueue.launchFirst(offerAcceptor);
-          launchAttempted.countDown();
-        } catch (LaunchException e) {
-          throw Throwables.propagate(e);
-        }
-      }
-    });
+    HostOffer offerA = setMode(OFFER_A, DRAINING);
+    HostOffer offerC = setMode(OFFER_C, DRAINING);
 
-    launchAttempted.await();
-    offerAdded.await();
-  }
+    TaskInfo task = TaskInfo.getDefaultInstance();
+    expect(offerAcceptor.apply(OFFER_B)).andReturn(Optional.of(task));
+    driver.launchTask(OFFER_B.getOffer().getId(), task);
 
-  @Test
-  public void testOffersSorted() throws Exception {
-    MaintenanceMode modeA = MaintenanceMode.NONE;
-    MaintenanceMode modeB = MaintenanceMode.DRAINING;
-    MaintenanceMode modeC = MaintenanceMode.NONE;
+    driver.declineOffer(offerA.getOffer().getId());
+    driver.declineOffer(offerC.getOffer().getId());
 
-    HostOffer hostOfferA = new HostOffer(OFFER_A, modeA);
-    HostOffer hostOfferB = new HostOffer(OFFER_B, modeB);
-    HostOffer hostOfferC = new HostOffer(OFFER_C, modeC);
+    control.replay();
 
-    expect(maintenanceController.getMode(HOST_A)).andReturn(modeA);
-    expect(maintenanceController.getMode(HOST_B)).andReturn(modeB);
-    expect(maintenanceController.getMode(HOST_C)).andReturn(modeC);
-    expect(offerAcceptor.apply(hostOfferA)).andReturn(Optional.<TaskInfo>absent());
-    expect(offerAcceptor.apply(hostOfferB)).andReturn(Optional.<TaskInfo>absent());
-    expect(offerAcceptor.apply(hostOfferC)).andReturn(Optional.<TaskInfo>absent());
+    offerQueue.addOffer(offerA);
+    offerQueue.addOffer(OFFER_B);
+    offerQueue.addOffer(offerC);
+    assertTrue(offerQueue.launchFirst(offerAcceptor));
+    clock.advance(RETURN_DELAY);
+  }
 
+  @Test
+  public void testFlushOffers() throws Exception {
     control.replay();
 
     offerQueue.addOffer(OFFER_A);
     offerQueue.addOffer(OFFER_B);
-    offerQueue.addOffer(OFFER_C);
+    offerQueue.driverDisconnected(new DriverDisconnected());
     assertFalse(offerQueue.launchFirst(offerAcceptor));
+    clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void testFlushOffers() throws Exception {
-    expect(maintenanceController.getMode(HOST_A)).andReturn(MaintenanceMode.NONE);
-    expect(maintenanceController.getMode(HOST_B)).andReturn(MaintenanceMode.NONE);
+  public void testDeclineOffer() throws Exception {
+    driver.declineOffer(OFFER_A.getOffer().getId());
 
     control.replay();
 
     offerQueue.addOffer(OFFER_A);
-    offerQueue.addOffer(OFFER_B);
-    offerQueue.driverDisconnected(new DriverDisconnected());
-    assertFalse(offerQueue.launchFirst(offerAcceptor));
+    clock.advance(RETURN_DELAY);
+  }
+
+  private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) {
+    return new HostOffer(
+        offer.getOffer(),
+        IHostAttributes.build(offer.getAttributes().newBuilder().setMode(mode)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index c0fa462..59bfbcb 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
@@ -42,6 +43,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -49,7 +51,6 @@ import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -60,6 +61,7 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IExpectationSetters;
@@ -70,7 +72,6 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import static org.apache.mesos.Protos.Offer;
@@ -103,7 +104,7 @@ public class PreemptorImplTest extends EasyMockTest {
   private StateManager stateManager;
   private SchedulingFilter schedulingFilter;
   private FakeClock clock;
-  private MaintenanceController maintenance;
+  private StatsProvider statsProvider;
   private OfferQueue offerQueue;
   private AttributeAggregate emptyJob;
 
@@ -112,8 +113,8 @@ public class PreemptorImplTest extends EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
-    maintenance = createMock(MaintenanceController.class);
     clock = new FakeClock();
+    statsProvider = new FakeStatsProvider();
     offerQueue = createMock(OfferQueue.class);
     emptyJob = new AttributeAggregate(
         Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
@@ -128,7 +129,7 @@ public class PreemptorImplTest extends EasyMockTest {
         schedulingFilter,
         PREEMPTION_DELAY,
         clock,
-        maintenance);
+        statsProvider);
 
     preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
   }
@@ -150,12 +151,10 @@ public class PreemptorImplTest extends EasyMockTest {
         IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
   }
 
-  private void expectGetMaintenance(String host) {
-    expect(maintenance.getMode(host)).andReturn(MaintenanceMode.NONE);
-  }
-
   @Test
   public void testPreempted() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
     runOnHost(lowPriority, HOST_A);
@@ -168,8 +167,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(highPriority);
     expectGetActiveTasks(lowPriority);
 
-    expectGetMaintenance(HOST_A);
-
     expectFiltering();
     expectPreempted(lowPriority);
 
@@ -179,6 +176,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testLowestPriorityPreempted() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
     runOnHost(lowPriority, HOST_A);
@@ -194,7 +193,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(highPriority);
     expectGetActiveTasks(lowerPriority, lowerPriority);
 
-    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(lowerPriority);
 
@@ -204,6 +202,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testOnePreemptableTask() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
     runOnHost(highPriority, HOST_A);
@@ -222,7 +222,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(pendingPriority);
     expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
 
-    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(lowestPriority);
 
@@ -250,6 +249,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testProductionPreemptingNonproduction() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     // Use a very low priority for the production task to show that priority is irrelevant.
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
@@ -263,7 +264,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(p1);
     expectGetActiveTasks(a1);
 
-    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(a1);
 
@@ -273,6 +273,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     // Use a very low priority for the production task to show that priority is irrelevant.
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
@@ -286,7 +288,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(p1);
     expectGetActiveTasks(a1);
 
-    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(a1);
 
@@ -315,7 +316,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures a production task can preempt 2 tasks on the same host.
   @Test
   public void testProductionPreemptingManyNonProduction() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
 
@@ -347,7 +348,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we select the minimal number of tasks to preempt
   @Test
   public void testMinimalSetPreempted() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
 
@@ -382,7 +383,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures a production task *never* preempts a production task from another job.
   @Test
   public void testProductionJobNeverPreemptsProductionJob() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
     p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
 
@@ -407,7 +408,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures that we can preempt if a task + offer can satisfy a pending task.
   @Test
   public void testPreemptWithOfferAndTask() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
 
     setUpHost(HOST_A, RACK_A);
 
@@ -435,7 +436,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
   @Test
   public void testPreemptWithOfferAndMultipleTasks() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
 
     setUpHost(HOST_A, RACK_A);
 
@@ -468,7 +469,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we don't preempt if a host has enough slack to satisfy a pending task.
   @Test
   public void testPreemptWithLargeOffer() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
 
     setUpHost(HOST_A, RACK_A);
 
@@ -524,7 +525,7 @@ public class PreemptorImplTest extends EasyMockTest {
         schedulingFilter,
         PREEMPTION_DELAY,
         clock,
-        maintenance);
+        statsProvider);
 
     assertEquals(
         Optional.<String>absent(),
@@ -556,7 +557,9 @@ public class PreemptorImplTest extends EasyMockTest {
         .transform(new Function<Offer, HostOffer>() {
           @Override
           public HostOffer apply(Offer offer) {
-            return new HostOffer(offer, MaintenanceMode.NONE);
+            return new HostOffer(
+                offer,
+                IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
           }
         });
     expect(offerQueue.getOffers()).andReturn(hostOffers);
@@ -569,8 +572,7 @@ public class PreemptorImplTest extends EasyMockTest {
   private IExpectationSetters<Set<Veto>> expectFiltering() {
     return expect(schedulingFilter.filter(
         EasyMock.<ResourceSlot>anyObject(),
-        EasyMock.eq(HOST_A),
-        EasyMock.eq(MaintenanceMode.NONE),
+        EasyMock.<IHostAttributes>anyObject(),
         EasyMock.<ITaskConfig>anyObject(),
         EasyMock.<String>anyObject(),
         EasyMock.eq(emptyJob))).andAnswer(
@@ -659,6 +661,5 @@ public class PreemptorImplTest extends EasyMockTest {
 
     expect(this.storageUtil.attributeStore.getHostAttributes(host))
         .andReturn(Optional.of(hostAttrs)).anyTimes();
-    expect(this.maintenance.getMode(host)).andReturn(NONE).anyTimes();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 0e699c9..0e8a98c 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -29,11 +29,13 @@ import com.twitter.common.util.Clock;
 import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -48,6 +50,7 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -58,7 +61,6 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
@@ -69,8 +71,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private static final IScheduledTask TASK_A = makeTask("a");
   private static final IScheduledTask TASK_B = makeTask("b");
-  private static final HostOffer OFFER =
-      new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
+  private static final HostOffer OFFER = new HostOffer(
+      Offers.makeOffer("OFFER_A", "HOST_A"),
+      IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
 
   private StorageTestUtil storageUtil;
   private StateManager stateManager;