You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/11/28 19:02:35 UTC

[1/3] aurora git commit: Enable custom offer scoring modules for task assignment

Repository: aurora
Updated Branches:
  refs/heads/master 21af250c9 -> 80139da46


http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 6b18296..ff80baa 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
+import org.apache.aurora.common.collections.Pair;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
@@ -32,9 +33,12 @@ import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.offers.Deferment.Noop;
-import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -54,11 +58,13 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_ACCEPT_RACES;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_CANCEL_FAILURES;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OUTSTANDING_OFFERS;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.STATICALLY_BANNED_OFFERS;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OUTSTANDING_OFFERS;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.STATICALLY_BANNED_OFFERS;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.VETO_EVALUATED_OFFERS;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
@@ -66,6 +72,8 @@ import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -89,21 +97,25 @@ public class OfferManagerImplTest extends EasyMockTest {
   private static final String HOST_C = "HOST_C";
   private static final HostOffer OFFER_C = new HostOffer(
       Offers.makeOffer("OFFER_C", HOST_C),
-      IHostAttributes.build(new HostAttributes().setMode(NONE)));
+      IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C)));
   private static final int PORT = 1000;
   private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
   private static final IScheduledTask TASK = makeTask("id", JOB);
+  private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
+      TASK.getAssignedTask().getTask(),
+      ResourceBag.EMPTY,
+      empty());
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
       .setTaskId(Protos.TaskID.newBuilder().setValue(Tasks.id(TASK)))
       .setAgentId(MESOS_OFFER.getAgentId())
       .build();
-  private static Operation launch = Operation.newBuilder()
+  private static final Operation LAUNCH = Operation.newBuilder()
       .setType(Operation.Type.LAUNCH)
       .setLaunch(Operation.Launch.newBuilder().addTaskInfos(TASK_INFO))
       .build();
-  private static final List<Operation> OPERATIONS = ImmutableList.of(launch);
+  private static final List<Operation> OPERATIONS = ImmutableList.of(LAUNCH);
   private static final long OFFER_FILTER_SECONDS = 0;
   private static final Filters OFFER_FILTER = Filters.newBuilder()
       .setRefuseSeconds(OFFER_FILTER_SECONDS)
@@ -113,6 +125,7 @@ public class OfferManagerImplTest extends EasyMockTest {
   private Driver driver;
   private OfferManagerImpl offerManager;
   private FakeStatsProvider statsProvider;
+  private SchedulingFilter schedulingFilter;
 
   @Before
   public void setUp() {
@@ -125,7 +138,13 @@ public class OfferManagerImplTest extends EasyMockTest {
         FAKE_TICKER
     );
     statsProvider = new FakeStatsProvider();
-    offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, new Noop());
+    schedulingFilter = createMock(SchedulingFilter.class);
+
+    offerManager = new OfferManagerImpl(driver,
+        offerSettings,
+        statsProvider,
+        new Noop(),
+        schedulingFilter);
   }
 
   @Test
@@ -136,11 +155,11 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.addOffer(hostOfferB);
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(hostOfferC);
+    offerManager.add(hostOfferB);
+    offerManager.add(OFFER_A);
+    offerManager.add(hostOfferC);
 
-    List<HostOffer> actual = ImmutableList.copyOf(offerManager.getOffers());
+    List<HostOffer> actual = ImmutableList.copyOf(offerManager.getAll());
 
     assertEquals(
         // hostOfferC has a further away start time, so it should be preferred.
@@ -160,15 +179,15 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.addOffer(offerA);
+    offerManager.add(offerA);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    offerManager.addOffer(OFFER_B);
+    offerManager.add(OFFER_B);
     assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    offerManager.addOffer(offerC);
+    offerManager.add(offerC);
     assertEquals(3, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(
         ImmutableSet.of(OFFER_B, offerA, offerC),
-        ImmutableSet.copyOf(offerManager.getOffers()));
+        ImmutableSet.copyOf(offerManager.getAll()));
     offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
     assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
@@ -179,19 +198,19 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A));
 
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getAll()));
 
     HostOffer offerA = setMode(OFFER_A, DRAINING);
     offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
-    assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getOffers()));
+    assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getAll()));
 
     offerA = setMode(OFFER_A, NONE);
     HostOffer offerB = setMode(OFFER_B, DRAINING);
     offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
     offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes()));
-    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getAll()));
   }
 
   @Test
@@ -201,9 +220,9 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
+    offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    offerManager.addOffer(OFFER_A);
+    offerManager.add(OFFER_A);
     assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
@@ -211,74 +230,83 @@ public class OfferManagerImplTest extends EasyMockTest {
   public void testGetOffersReturnsAllOffers() {
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    offerManager.add(OFFER_A);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
 
-    offerManager.cancelOffer(OFFER_A_ID);
+    offerManager.cancel(OFFER_A_ID);
     assertEquals(0, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
+    assertTrue(Iterables.isEmpty(offerManager.getAll()));
     assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
   @Test
   public void testOfferFilteringDueToStaticBan() {
+    expectFilterNone();
+
     control.replay();
 
     // Static ban ignored when now offers.
-    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
-    offerManager.addOffer(OFFER_A);
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    offerManager.add(OFFER_A);
+    assertEquals(OFFER_A,
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
 
     // Add static ban.
-    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
   }
 
   @Test
   public void testStaticBanExpiresAfterMaxHoldTime() throws InterruptedException {
+    expectFilterNone();
+
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+    offerManager.add(OFFER_A);
+    offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban expires after maximum amount of time an offer is held.
     FAKE_TICKER.advance(RETURN_DELAY);
     offerManager.cleanupStaticBans();
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+    assertEquals(OFFER_A,
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
   @Test
   public void testStaticBanIsClearedOnDriverDisconnect() {
+    expectFilterNone();
+
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+    offerManager.add(OFFER_A);
+    offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban is cleared when driver is disconnected.
     offerManager.driverDisconnected(new DriverDisconnected());
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
-    offerManager.addOffer(OFFER_A);
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+    offerManager.add(OFFER_A);
+    assertEquals(OFFER_A,
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
   }
 
   @Test
   public void testGetOffer() {
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getAgentId()));
+    offerManager.add(OFFER_A);
+    assertEquals(Optional.of(OFFER_A), offerManager.get(OFFER_A.getOffer().getAgentId()));
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
@@ -289,7 +317,7 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
+    offerManager.add(OFFER_A);
     offerManager.launchTask(OFFER_A_ID, TASK_INFO);
   }
 
@@ -308,8 +336,8 @@ public class OfferManagerImplTest extends EasyMockTest {
   public void testFlushOffers() {
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
     assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     offerManager.driverDisconnected(new DriverDisconnected());
     assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
@@ -319,26 +347,29 @@ public class OfferManagerImplTest extends EasyMockTest {
   public void testCancelFailure() {
     control.replay();
 
-    offerManager.cancelOffer(OFFER_A.getOffer().getId());
+    offerManager.cancel(OFFER_A.getOffer().getId());
     assertEquals(1, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
   }
 
   @Test
   public void testBanAndUnbanOffer() {
+    expectFilterNone();
+
     control.replay();
 
     // After adding a banned offer, user can see it is in OUTSTANDING_OFFERS but cannot retrieve it.
-    offerManager.banOffer(OFFER_A_ID);
-    offerManager.addOffer(OFFER_A);
+    offerManager.ban(OFFER_A_ID);
+    offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
 
-    offerManager.cancelOffer(OFFER_A_ID);
-    offerManager.addOffer(OFFER_A);
+    offerManager.cancel(OFFER_A_ID);
+    offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+    assertEquals(OFFER_A,
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
   }
 
   private static HostOffer setUnavailability(HostOffer offer, long startMs) {
@@ -363,7 +394,7 @@ public class OfferManagerImplTest extends EasyMockTest {
             RETURN_DELAY,
             Long.MAX_VALUE,
             FAKE_TICKER);
-    return new OfferManagerImpl(driver, settings, statsProvider, new Noop());
+    return new OfferManagerImpl(driver, settings, statsProvider, new Noop(), schedulingFilter);
   }
 
   @Test
@@ -377,25 +408,25 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(CPUS, 24.0, true),
             mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer("host2", mesosScalar(CPUS, 5.0), mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3", mesosScalar(CPUS, 10.0), mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(small);
+    cpuManager.add(medium);
+    cpuManager.add(large);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -410,7 +441,6 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(CPUS, 23.0, true),
             mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer(
             "host1",
@@ -418,21 +448,22 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(CPUS, 24.0, true),
             mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3", mesosScalar(CPUS, 1.0), mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(small);
+    cpuManager.add(medium);
+    cpuManager.add(large);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, true)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -442,25 +473,25 @@ public class OfferManagerImplTest extends EasyMockTest {
     HostOffer small = setMode(new HostOffer(
         offer("host1", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 1.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer("host2", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 5.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 10.0)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(small);
+    cpuManager.add(medium);
+    cpuManager.add(large);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -470,25 +501,25 @@ public class OfferManagerImplTest extends EasyMockTest {
     HostOffer small = setMode(new HostOffer(
         offer("host1", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 1.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer("host2", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 5.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 10.0)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(small);
+    cpuManager.add(medium);
+    cpuManager.add(large);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -502,14 +533,12 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(RAM_MB, 2.0),
             mesosScalar(DISK_MB, 3.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer("host2",
             mesosScalar(CPUS, 1.0),
             mesosScalar(RAM_MB, 3.0),
             mesosScalar(DISK_MB, 2.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3",
             mesosScalar(CPUS, 10.0),
@@ -518,16 +547,18 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(DISK_MB, 1.0)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(small);
+    cpuManager.add(large);
+    cpuManager.add(medium);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -545,13 +576,14 @@ public class OfferManagerImplTest extends EasyMockTest {
         driver,
         settings,
         statsProvider,
-        new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock));
+        new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock),
+        schedulingFilter);
 
     driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
 
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
+    offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
 
     clock.advance(RETURN_DELAY);
@@ -575,11 +607,154 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.banOffer(OFFER_A_ID);
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(sameAgent);
-    offerManager.cancelOffer(OFFER_A_ID);
-    offerManager.addOffer(sameAgent2);
-    assertEquals(ImmutableSet.of(sameAgent2), offerManager.getOffers());
+    offerManager.ban(OFFER_A_ID);
+    offerManager.add(OFFER_A);
+    offerManager.add(sameAgent);
+    offerManager.cancel(OFFER_A_ID);
+    offerManager.add(sameAgent2);
+    assertEquals(ImmutableSet.of(sameAgent2), offerManager.getAll());
+  }
+
+  private void expectFilterNone() {
+    // Most tests will use a permissive scheduling filter
+    expect(schedulingFilter.filter(anyObject(), anyObject()))
+        .andReturn(ImmutableSet.of())
+        .anyTimes();
+  }
+
+  @Test
+  public void testGetMatchingSingleAgent() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    assertEquals(Optional.of(OFFER_A),
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+  }
+
+  @Test
+  public void testGetMatchingNoGloballyBanned() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+    offerManager.ban(OFFER_A_ID);
+    assertEquals(Optional.absent(),
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+    assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+  }
+
+  @Test
+  public void testGetMatchingNoVetoed() {
+    // Calling getMatching when a veto is present should return an empty option. Additionally,
+    // it should not statically ban the offer if it is vetoed.
+    expect(schedulingFilter.filter(new UnusedResource(OFFER_A, false), EMPTY_REQUEST))
+        .andReturn(ImmutableSet.of(SchedulingFilter.Veto.dedicatedHostConstraintMismatch()));
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(Optional.absent(),
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+  }
+
+  @Test
+  public void testGetAllMatching() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    offerManager.add(OFFER_C);
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testGetAllMatchingNoGloballyBanned() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    offerManager.add(OFFER_C);
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+    offerManager.ban(OFFER_B.getOffer().getId());
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+  }
+
+  @Test
+  public void testGetAllMatchingNoStaticallyBanned() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    offerManager.add(OFFER_C);
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY);
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), GROUP_KEY)),
+        offerManager.getStaticBans());
+  }
+
+  @Test
+  public void testGetAllMatchingIgnoreNoCpusAndMem() {
+    expectFilterNone();
+
+    HostOffer empty = setMode(new HostOffer(
+        offer("host1",
+            mesosScalar(CPUS, 0),
+            mesosScalar(RAM_MB, 0),
+            mesosScalar(DISK_MB, 3.0)),
+        HOST_ATTRIBUTES_A), NONE);
+
+    control.replay();
+    offerManager.add(empty);
+    offerManager.add(OFFER_A);
+    assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(ImmutableSet.of(OFFER_A),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(ImmutableSet.of(empty, OFFER_A),
+        ImmutableSet.copyOf(offerManager.getAll()));
+  }
+
+  @Test
+  public void testGetAllMatchingNoVetoed() {
+    // Calling getAllMatching should statically ban the offer as well if it is statically vetoed
+    expect(schedulingFilter.filter(new UnusedResource(OFFER_A, false), EMPTY_REQUEST))
+        .andReturn(ImmutableSet.of(SchedulingFilter.Veto.dedicatedHostConstraintMismatch()));
+    expect(schedulingFilter.filter(new UnusedResource(OFFER_B, false), EMPTY_REQUEST))
+        .andReturn(ImmutableSet.of());
+    expect(schedulingFilter.filter(new UnusedResource(OFFER_C, false), EMPTY_REQUEST))
+        .andReturn(ImmutableSet.of(SchedulingFilter.Veto.unsatisfiedLimit("test_limit")));
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    offerManager.add(OFFER_C);
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(ImmutableSet.of(OFFER_B),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), GROUP_KEY)),
+        offerManager.getStaticBans());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
index c76b3e3..a346e44 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
@@ -300,7 +300,7 @@ public class PendingTaskProcessorTest extends EasyMockTest {
   }
 
   private void expectOffers(HostOffer... offers) {
-    expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers));
+    expect(offerManager.getAll()).andReturn(ImmutableSet.copyOf(offers));
   }
 
   private void expectGetClusterState(IScheduledTask... returnedTasks) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
index f14cba1..1061583 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
@@ -82,7 +82,7 @@ public class PreemptorImplTest extends EasyMockTest {
     slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { });
     statsProvider = new FakeStatsProvider();
     OfferManager offerManager = createMock(OfferManager.class);
-    expect(offerManager.getOffer(anyObject(Protos.AgentID.class)))
+    expect(offerManager.get(anyObject(Protos.AgentID.class)))
         .andReturn(Optional.of(OFFER))
         .anyTimes();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
index 97c238c..815a3ef 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
@@ -25,8 +25,8 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.scheduler.config.CliOptions;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.scheduling.TaskAssigner;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
new file mode 100644
index 0000000..627055c
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class FirstFitOfferSelectorTest extends EasyMockTest {
+
+  private static final IAssignedTask TASK = makeTask("id", JOB).getAssignedTask();
+  private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
+      TASK.getTask(),
+      ResourceBag.EMPTY,
+      empty());
+
+  private OfferSelector firstFitOfferSelector;
+
+  @Before
+  public void setUp() {
+    firstFitOfferSelector = new FirstFitOfferSelector();
+  }
+
+  @Test
+  public void testNoOffers() {
+    Iterable<HostOffer> offers = ImmutableList.of();
+
+    control.replay();
+
+    assertFalse(firstFitOfferSelector.select(offers, EMPTY_REQUEST).isPresent());
+  }
+
+  @Test
+  public void testReturnFirstOffer() {
+    HostOffer offerA = createMock(HostOffer.class);
+    HostOffer offerB = createMock(HostOffer.class);
+    Iterable<HostOffer> offers = ImmutableList.of(offerA, offerB);
+
+    control.replay();
+
+    assertEquals(offerA, firstFitOfferSelector.select(offers, EMPTY_REQUEST).get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
new file mode 100644
index 0000000..e094950
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static org.apache.aurora.scheduler.scheduling.TaskAssignerImpl.ASSIGNER_LAUNCH_FAILURES;
+import static org.apache.aurora.scheduler.scheduling.TaskAssignerImpl.LAUNCH_FAILED_MSG;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.mesos.v1.Protos.Offer;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TaskAssignerImplTest extends EasyMockTest {
+
+  private static final int PORT = 1000;
+  private static final Offer MESOS_OFFER =
+      offer(mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT));
+  private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue();
+  private static final HostOffer OFFER =
+      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
+          .setHost(MESOS_OFFER.getHostname())
+          .setAttributes(ImmutableSet.of(
+              new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
+  private static final IScheduledTask TASK = makeTask("id", JOB);
+  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
+  private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
+      .setName("taskName")
+      .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
+      .setAgentId(MESOS_OFFER.getAgentId())
+      .build();
+  private static final IInstanceKey INSTANCE_KEY =
+      InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
+  private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
+  private static final HostOffer OFFER_2 = new HostOffer(
+      Offer.newBuilder()
+          .setId(OfferID.newBuilder().setValue("offerId0"))
+          .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
+          .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
+          .setHostname("hostName0")
+          .addResources(mesosRange(PORTS, PORT))
+          .addResources(mesosScalar(CPUS, 1))
+          .addResources(mesosScalar(RAM_MB, 1024))
+          .build(),
+      IHostAttributes.build(new HostAttributes()));
+
+  private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
+
+  private AttributeAggregate aggregate;
+  private ResourceRequest resourceRequest;
+
+  private MutableStoreProvider storeProvider;
+  private StateManager stateManager;
+  private MesosTaskFactory taskFactory;
+  private OfferManager offerManager;
+  private TaskAssignerImpl assigner;
+  private TierManager tierManager;
+  private FakeStatsProvider statsProvider;
+  private UpdateAgentReserver updateAgentReserver;
+
+  @Before
+  public void setUp() throws Exception {
+    storeProvider = createMock(MutableStoreProvider.class);
+    taskFactory = createMock(MesosTaskFactory.class);
+    stateManager = createMock(StateManager.class);
+    offerManager = createMock(OfferManager.class);
+    tierManager = createMock(TierManager.class);
+    updateAgentReserver = createMock(UpdateAgentReserver.class);
+    statsProvider = new FakeStatsProvider();
+    // TODO(jly): FirstFitOfferSelector returns the first offer which is what we want for testing,
+    // but if its implementation becomes more complex we may need to replace it with a fake.
+    OfferSelector offerSelector = new FirstFitOfferSelector();
+    assigner = new TaskAssignerImpl(
+        stateManager,
+        taskFactory,
+        offerManager,
+        tierManager,
+        updateAgentReserver,
+        statsProvider,
+        offerSelector);
+    aggregate = empty();
+    resourceRequest = new ResourceRequest(
+        TASK.getAssignedTask().getTask(),
+        ResourceBag.EMPTY,
+        aggregate);
+  }
+
+  @Test
+  public void testAssignNoTasks() throws Exception {
+    control.replay();
+
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
+  }
+
+  @Test
+  public void testAssignmentClearedOnError() throws Exception {
+    expectNoUpdateReservations(1);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+        .andReturn(ImmutableSet.of(OFFER, OFFER_2));
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expectAssignTask(MESOS_OFFER);
+    expect(stateManager.changeState(
+        storeProvider,
+        Tasks.id(TASK),
+        Optional.of(PENDING),
+        LOST,
+        LAUNCH_FAILED_MSG))
+        .andReturn(StateChangeResult.SUCCESS);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
+    // Ensures scheduling loop terminates on the first launch failure.
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                makeTask("id2", JOB).getAssignedTask(),
+                makeTask("id3", JOB).getAssignedTask()),
+            NO_RESERVATION));
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
+  }
+
+  @Test
+  public void testAssignmentSkippedForReservedSlave() throws Exception {
+    expectNoUpdateReservations(0);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+        .andReturn(ImmutableSet.of(OFFER));
+
+    control.replay();
+
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
+                ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
+  }
+
+  @Test
+  public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
+    // Ensures slave/task reservation relationship is only enforced in slave->task direction
+    // and permissive in task->slave direction. In other words, a task with a slave reservation
+    // should still be tried against other unreserved slaves.
+    expectNoUpdateReservations(1);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+        .andReturn(ImmutableSet.of(OFFER_2, OFFER));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expectAssignTask(OFFER_2.getOffer());
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), false))
+        .andReturn(TASK_INFO);
+    offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
+
+    control.replay();
+
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+  }
+
+  @Test
+  public void testResourceMapperCallback() {
+    AssignedTask builder = TASK.newBuilder().getAssignedTask();
+    builder.unsetAssignedPorts();
+
+    control.replay();
+
+    assertEquals(
+        TASK.getAssignedTask(),
+        assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
+  }
+
+  @Test
+  public void testAssignToReservedAgent() throws Exception {
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+        .andReturn(Optional.of(OFFER));
+    expectAssignTask(MESOS_OFFER);
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+  }
+
+  @Test
+  public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+        .andReturn(Optional.absent());
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expectLastCall();
+
+    control.replay();
+
+    assertEquals(
+        ImmutableSet.of(),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertEquals(empty(), aggregate);
+  }
+
+  @Test
+  public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+        .andReturn(Optional.of(OFFER));
+    expectAssignTask(MESOS_OFFER);
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+        .andReturn(TASK_INFO);
+
+    // Normal scheduling loop for the remaining task...
+    IScheduledTask secondTask = makeTask("another-task", JOB, 9999);
+    TaskInfo secondTaskInfo = TaskInfo.newBuilder()
+        .setName("another-task")
+        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(secondTask)))
+        .setAgentId(MESOS_OFFER.getAgentId())
+        .build();
+    expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent());
+    ImmutableSet<HostOffer> matchingOffers = ImmutableSet.of(OFFER);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+        .andReturn(matchingOffers);
+    expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
+        .andReturn(ImmutableSet.of());
+    expectAssignTask(MESOS_OFFER, secondTask);
+    offerManager.launchTask(MESOS_OFFER.getId(), secondTaskInfo);
+    expect(taskFactory.createFrom(secondTask.getAssignedTask(), MESOS_OFFER, false))
+        .andReturn(secondTaskInfo);
+
+    control.replay();
+
+    assertEquals(
+        Tasks.ids(TASK, secondTask),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            GROUP_KEY,
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                secondTask.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+  }
+
+  private void expectAssignTask(Offer offer) {
+    expectAssignTask(offer, TASK);
+  }
+
+  private void expectAssignTask(Offer offer, IScheduledTask task) {
+    expect(stateManager.assignTask(
+        eq(storeProvider),
+        eq(Tasks.id(task)),
+        eq(offer.getHostname()),
+        eq(offer.getAgentId()),
+        anyObject())).andReturn(task.getAssignedTask());
+  }
+
+  private void expectNoUpdateReservations(int offers) {
+    expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
+    for (int i = 0; i < offers; i++) {
+      expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index 7a4525a..260ff9d 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -46,9 +46,7 @@ import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java b/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
deleted file mode 100644
index 58f9de2..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
+++ /dev/null
@@ -1,539 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.InstanceKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.FrameworkID;
-import org.apache.mesos.v1.Protos.OfferID;
-import org.apache.mesos.v1.Protos.TaskID;
-import org.apache.mesos.v1.Protos.TaskInfo;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
-import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
-import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
-import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
-import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_EVALUATED_OFFERS;
-import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_LAUNCH_FAILURES;
-import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.LAUNCH_FAILED_MSG;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.mesos.v1.Protos.Offer;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class FirstFitTaskAssignerTest extends EasyMockTest {
-
-  private static final int PORT = 1000;
-  private static final Offer MESOS_OFFER =
-      offer(mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT));
-  private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue();
-  private static final HostOffer OFFER =
-      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
-          .setHost(MESOS_OFFER.getHostname())
-          .setAttributes(ImmutableSet.of(
-              new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
-  private static final IScheduledTask TASK = makeTask("id", JOB);
-  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
-  private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
-      .setName("taskName")
-      .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
-      .setAgentId(MESOS_OFFER.getAgentId())
-      .build();
-  private static final IInstanceKey INSTANCE_KEY =
-      InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
-  private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
-  private static final UnusedResource UNUSED = new UnusedResource(
-      bagFromMesosResources(MESOS_OFFER.getResourcesList()),
-      OFFER.getAttributes());
-  private static final HostOffer OFFER_2 = new HostOffer(
-      Offer.newBuilder()
-          .setId(OfferID.newBuilder().setValue("offerId0"))
-          .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-          .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
-          .setHostname("hostName0")
-          .addResources(mesosRange(PORTS, PORT))
-          .addResources(mesosScalar(CPUS, 1))
-          .addResources(mesosScalar(RAM_MB, 1024))
-          .build(),
-      IHostAttributes.build(new HostAttributes()));
-
-  private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
-
-  private ResourceRequest resourceRequest;
-
-  private MutableStoreProvider storeProvider;
-  private StateManager stateManager;
-  private SchedulingFilter filter;
-  private MesosTaskFactory taskFactory;
-  private OfferManager offerManager;
-  private FirstFitTaskAssigner assigner;
-  private TierManager tierManager;
-  private FakeStatsProvider statsProvider;
-  private UpdateAgentReserver updateAgentReserver;
-
-  @Before
-  public void setUp() throws Exception {
-    storeProvider = createMock(MutableStoreProvider.class);
-    filter = createMock(SchedulingFilter.class);
-    taskFactory = createMock(MesosTaskFactory.class);
-    stateManager = createMock(StateManager.class);
-    offerManager = createMock(OfferManager.class);
-    tierManager = createMock(TierManager.class);
-    updateAgentReserver = createMock(UpdateAgentReserver.class);
-    statsProvider = new FakeStatsProvider();
-    assigner = new FirstFitTaskAssigner(
-        stateManager,
-        filter,
-        taskFactory,
-        offerManager,
-        tierManager,
-        updateAgentReserver,
-        statsProvider);
-    resourceRequest = new ResourceRequest(
-        TASK.getAssignedTask().getTask(),
-        ResourceBag.EMPTY,
-        empty());
-  }
-
-  @Test
-  public void testAssignNoTasks() throws Exception {
-    control.replay();
-
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
-  }
-
-  @Test
-  public void testAssignPartialNoVetoes() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("id2", JOB).getAssignedTask(),
-                makeTask("id3", JOB).getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignVetoesWithStaticBan() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignVetoesWithNoStaticBan() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignmentClearedOnError() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2));
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    expect(stateManager.changeState(
-        storeProvider,
-        Tasks.id(TASK),
-        Optional.of(PENDING),
-        LOST,
-        LAUNCH_FAILED_MSG))
-        .andReturn(StateChangeResult.SUCCESS);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    // Ensures scheduling loop terminates on the first launch failure.
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("id2", JOB).getAssignedTask(),
-                makeTask("id3", JOB).getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignmentSkippedForReservedSlave() throws Exception {
-    expectNoUpdateReservations(0);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
-                ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
-    assertEquals(0, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
-    // Ensures slave/task reservation relationship is only enforced in slave->task direction
-    // and permissive in task->slave direction. In other words, a task with a slave reservation
-    // should still be tried against other unreserved slaves.
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(OFFER_2.getOffer().getResourcesList()),
-            OFFER_2.getAttributes()),
-        resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(OFFER_2.getOffer());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), false))
-        .andReturn(TASK_INFO);
-    offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignerDoesNotReturnOnFirstMismatch() throws Exception {
-    // Ensures scheduling loop does not terminate prematurely when the first mismatch is identified.
-    HostOffer mismatched = new HostOffer(
-        Offer.newBuilder()
-            .setId(OfferID.newBuilder().setValue("offerId0"))
-            .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-            .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
-            .setHostname("hostName0")
-            .addResources(mesosRange(PORTS, PORT))
-            .addResources(mesosScalar(CPUS, 1))
-            .addResources(mesosScalar(RAM_MB, 1024))
-            .build(),
-        IHostAttributes.build(new HostAttributes()));
-
-    expectNoUpdateReservations(2);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(mismatched.getOffer().getResourcesList()),
-            mismatched.getAttributes()),
-        resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch")));
-    offerManager.banOfferForTaskGroup(mismatched.getOffer().getId(), GROUP_KEY);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()),
-        resourceRequest))
-        .andReturn(ImmutableSet.of());
-
-    expectAssignTask(MESOS_OFFER);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer(), false))
-        .andReturn(TASK_INFO);
-    offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testResourceMapperCallback() {
-    AssignedTask builder = TASK.newBuilder().getAssignedTask();
-    builder.unsetAssignedPorts();
-
-    control.replay();
-
-    assertEquals(
-        TASK.getAssignedTask(),
-        assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
-  }
-
-  @Test
-  public void testAssignToReservedAgent() throws Exception {
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
-    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1)));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
-    expectLastCall();
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(empty(), aggregate);
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
-    AttributeAggregate aggregate = empty();
-    ResourceRequest resources = new ResourceRequest(
-        TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate);
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
-    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(TASK_INFO);
-
-    // Normal scheduling loop for the remaining task...
-    expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent());
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
-        .andReturn(ImmutableSet.of());
-    expect(filter.filter(UNUSED, resources))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol")));
-    offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resources,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("another-task", JOB, 9999).getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testSkipsOffersWithNoMemAndNoCpu() {
-    expectNoUpdateReservations(0);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    // Offer lacks CPU.
-    Offer mesosOffer = offer(mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT));
-    HostOffer offer = new HostOffer(mesosOffer, IHostAttributes.build(new HostAttributes()
-        .setHost(mesosOffer.getHostname())
-        .setAttributes(ImmutableSet.of(
-            new Attribute("host", ImmutableSet.of(mesosOffer.getHostname()))))));
-
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer));
-
-    control.replay();
-
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(0, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  private void expectAssignTask(Offer offer) {
-    expect(stateManager.assignTask(
-        eq(storeProvider),
-        eq(Tasks.id(TASK)),
-        eq(offer.getHostname()),
-        eq(offer.getAgentId()),
-        anyObject())).andReturn(TASK.getAssignedTask());
-  }
-
-  private void expectNoUpdateReservations(int offers) {
-    expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
-    for (int i = 0; i < offers; i++) {
-      expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
index f8e8023..dfcbb4a 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
@@ -45,7 +45,7 @@ public class AsyncStatsModuleTest extends EasyMockTest {
   @Test
   public void testOfferAdapter() {
     OfferManager offerManager = createMock(OfferManager.class);
-    expect(offerManager.getOffers()).andReturn(ImmutableList.of(
+    expect(offerManager.getAll()).andReturn(ImmutableList.of(
         new HostOffer(Protos.Offer.newBuilder()
             .setId(Protos.OfferID.newBuilder().setValue("offerId"))
             .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("frameworkId"))


[2/3] aurora git commit: Enable custom offer scoring modules for task assignment

Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
new file mode 100644
index 0000000..ee65bab
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+
+public class FirstFitOfferSelector implements OfferSelector {
+
+  @Override
+  public Optional<HostOffer> select(Iterable<HostOffer> offers, ResourceRequest resourceRequest) {
+
+    return Optional.fromNullable(Iterables.getFirst(offers, null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
new file mode 100644
index 0000000..4d36487
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+
+public class FirstFitOfferSelectorModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(OfferSelector.class).to(FirstFitOfferSelector.class);
+    bind(FirstFitOfferSelector.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java b/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
new file mode 100644
index 0000000..c95b980
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.base.Optional;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+
+/**
+ * Injected into {@link TaskAssignerImpl}, this class scores the offers available and returns an
+ * option containing the offer to use.
+ */
+public interface OfferSelector {
+
+  /**
+   * Score offers that fit within the given {@link ResourceRequest} and return an option containing
+   * the offer to use for assignment.
+   *
+   * @param offers A stream of offers that match the given {@link ResourceRequest}.
+   * @param resourceRequest The {@link ResourceRequest} for the task to assign.
+   * @return An {@link Optional} containing the offer to use.
+   */
+  Optional<HostOffer> select(Iterable<HostOffer> offers, ResourceRequest resourceRequest);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index 0796712..f72dacd 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -139,8 +139,8 @@ public class SchedulingModule extends AbstractModule {
         bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class);
         bind(BiCache.BiCacheSettings.class).toInstance(
             new BiCache.BiCacheSettings(options.reservationDuration, "reservation"));
-        bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
-        bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+        bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+        bind(TaskSchedulerImpl.class).in(Singleton.class);
         expose(TaskScheduler.class);
       }
     });

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
new file mode 100644
index 0000000..87619b5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+
+/**
+ * Responsible for matching a task against an offer and launching it.
+ */
+public interface TaskAssigner {
+  /**
+   * Tries to match a task against an offer.  If a match is found, the assigner makes the
+   * appropriate changes to the task and requests task launch.
+   *
+   * @param storeProvider Storage provider.
+   * @param resourceRequest The request for resources being scheduled.
+   * @param groupKey Task group key.
+   * @param tasks Tasks to assign.
+   * @param preemptionReservations Slave reservations.
+   * @return Successfully assigned task IDs.
+   */
+  Set<String> maybeAssign(
+      MutableStoreProvider storeProvider,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Iterable<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> preemptionReservations);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
new file mode 100644
index 0000000..a1dd74f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
+import org.apache.mesos.v1.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+
+public class TaskAssignerImpl implements TaskAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskAssignerImpl.class);
+
+  @VisibleForTesting
+  static final Optional<String> LAUNCH_FAILED_MSG =
+      Optional.of("Unknown exception attempting to schedule task.");
+  @VisibleForTesting
+  static final String ASSIGNER_LAUNCH_FAILURES = "assigner_launch_failures";
+
+  private final AtomicLong launchFailures;
+
+  private final StateManager stateManager;
+  private final MesosTaskFactory taskFactory;
+  private final OfferManager offerManager;
+  private final TierManager tierManager;
+  private final UpdateAgentReserver updateAgentReserver;
+  private final OfferSelector offerSelector;
+
+  @Inject
+  public TaskAssignerImpl(
+      StateManager stateManager,
+      MesosTaskFactory taskFactory,
+      OfferManager offerManager,
+      TierManager tierManager,
+      UpdateAgentReserver updateAgentReserver,
+      StatsProvider statsProvider,
+      OfferSelector offerSelector) {
+
+    this.stateManager = requireNonNull(stateManager);
+    this.taskFactory = requireNonNull(taskFactory);
+    this.offerManager = requireNonNull(offerManager);
+    this.tierManager = requireNonNull(tierManager);
+    this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
+    this.updateAgentReserver = requireNonNull(updateAgentReserver);
+    this.offerSelector = requireNonNull(offerSelector);
+  }
+
+  @VisibleForTesting
+  IAssignedTask mapAndAssignResources(Protos.Offer offer, IAssignedTask task) {
+    IAssignedTask assigned = task;
+    for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) {
+      if (type.getMapper().isPresent()) {
+        assigned = type.getMapper().get().mapAndAssign(offer, assigned);
+      }
+    }
+    return assigned;
+  }
+
+  private Protos.TaskInfo assign(
+      Storage.MutableStoreProvider storeProvider,
+      Protos.Offer offer,
+      String taskId,
+      boolean revocable) {
+
+    String host = offer.getHostname();
+    IAssignedTask assigned = stateManager.assignTask(
+        storeProvider,
+        taskId,
+        host,
+        offer.getAgentId(),
+        task -> mapAndAssignResources(offer, task));
+    LOG.info(
+        "Offer on agent {} (id {}) is being assigned task for {}.",
+        host, offer.getAgentId().getValue(), taskId);
+    return taskFactory.createFrom(assigned, offer, revocable);
+  }
+
+  private void launchUsingOffer(
+      Storage.MutableStoreProvider storeProvider,
+      boolean revocable,
+      ResourceRequest resourceRequest,
+      IAssignedTask task,
+      HostOffer offer,
+      ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
+
+    String taskId = task.getTaskId();
+    Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, revocable);
+    resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
+    try {
+      offerManager.launchTask(offer.getOffer().getId(), taskInfo);
+      assignmentResult.add(taskId);
+    } catch (OfferManager.LaunchException e) {
+      LOG.warn("Failed to launch task.", e);
+      launchFailures.incrementAndGet();
+
+      // The attempt to schedule the task failed, so we need to backpedal on the assignment.
+      // It is in the LOST state and a new task will move to PENDING to replace it.
+      // Should the state change fail due to storage issues, that's okay.  The task will
+      // time out in the ASSIGNED state and be moved to LOST.
+      stateManager.changeState(
+          storeProvider,
+          taskId,
+          Optional.of(PENDING),
+          LOST,
+          LAUNCH_FAILED_MSG);
+      throw e;
+    }
+  }
+
+  private Iterable<IAssignedTask> maybeAssignReserved(
+      Iterable<IAssignedTask> tasks,
+      Storage.MutableStoreProvider storeProvider,
+      boolean revocable,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      ImmutableSet.Builder<String> assignmentResult) {
+
+    if (!updateAgentReserver.hasReservations(groupKey)) {
+      return tasks;
+    }
+
+    // Data structure to record which tasks should be excluded from the regular (non-reserved)
+    // scheduling loop. This is important because we release reservations once they are used,
+    // so we need to record them separately to avoid them being double-scheduled.
+    ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
+
+    for (IAssignedTask task : tasks) {
+      IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
+      Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
+      if (maybeAgentId.isPresent()) {
+        excludeBuilder.add(key);
+        Optional<HostOffer> offer = offerManager.getMatching(
+            Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(),
+            resourceRequest,
+            revocable);
+        if (offer.isPresent()) {
+          try {
+            // The offer can still be veto'd because of changed constraints, or because the
+            // Scheduler hasn't been updated by Mesos yet...
+            launchUsingOffer(storeProvider,
+                revocable,
+                resourceRequest,
+                task,
+                offer.get(),
+                assignmentResult);
+            LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
+            updateAgentReserver.release(maybeAgentId.get(), key);
+          } catch (OfferManager.LaunchException e) {
+            updateAgentReserver.release(maybeAgentId.get(), key);
+          }
+        } else {
+          LOG.info(
+              "Tried to reuse offer on {} for {}, but was not ready yet.",
+              maybeAgentId.get(),
+              key);
+        }
+      }
+    }
+
+    // Return only the tasks that didn't have reservations. Offers on agents that were reserved
+    // might not have been seen by Aurora yet, so we need to wait until the reservation expires
+    // before giving up and falling back to the first-fit algorithm.
+    Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
+    return Iterables.filter(tasks, t -> !toBeExcluded.contains(
+        InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
+  }
+
+  /**
+   * Determine whether or not the offer is reserved for a different task via preemption or
+   * update affinity.
+   */
+  @SuppressWarnings("PMD.UselessParentheses")  // TODO(jly): PMD bug, remove when upgrade from 5.5.3
+  private boolean isAgentReserved(HostOffer offer,
+                                  TaskGroupKey groupKey,
+                                  Map<String, TaskGroupKey> preemptionReservations) {
+
+    String agentId = offer.getOffer().getAgentId().getValue();
+    Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
+        preemptionReservations.get(agentId));
+
+    return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey))
+        || !updateAgentReserver.getReservations(agentId).isEmpty();
+  }
+
+  @Timed("assigner_maybe_assign")
+  @Override
+  public Set<String> maybeAssign(
+      Storage.MutableStoreProvider storeProvider,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Iterable<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> preemptionReservations) {
+
+    if (Iterables.isEmpty(tasks)) {
+      return ImmutableSet.of();
+    }
+
+    boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
+    ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+
+    // Assign tasks reserved for a specific agent (e.g. for update affinity)
+    Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
+        tasks,
+        storeProvider,
+        revocable,
+        resourceRequest,
+        groupKey,
+        assignmentResult);
+
+    // Assign the rest of the non-reserved tasks
+    for (IAssignedTask task : nonReservedTasks) {
+      try {
+        // Get all offers that will satisfy the given ResourceRequest and that are not reserved
+        // for updates or preemption
+        FluentIterable<HostOffer> matchingOffers = FluentIterable
+            .from(offerManager.getAllMatching(groupKey, resourceRequest, revocable))
+            .filter(o -> !isAgentReserved(o, groupKey, preemptionReservations));
+
+        // Determine which is the optimal offer to select for the given request
+        Optional<HostOffer> optionalOffer = offerSelector.select(matchingOffers, resourceRequest);
+
+        // If no offer is chosen, continue to the next task
+        if (!optionalOffer.isPresent()) {
+          continue;
+        }
+
+        // Attempt to launch the task using the chosen offer
+        HostOffer offer = optionalOffer.get();
+        launchUsingOffer(storeProvider,
+            revocable,
+            resourceRequest,
+            task,
+            offer,
+            assignmentResult);
+      } catch (OfferManager.LaunchException e) {
+        // Any launch exception causes the scheduling round to terminate for this TaskGroup.
+        break;
+      }
+    }
+
+    return assignmentResult.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
new file mode 100644
index 0000000..2ddd4f5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.List;
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+
+import org.apache.aurora.scheduler.app.MoreModules;
+import org.apache.aurora.scheduler.config.CliOptions;
+
+/**
+ * The default TaskAssigner implementation that allows the injection of custom offer
+ * selecting modules via the '-offer_selector_modules' flag.
+ */
+public class TaskAssignerImplModule extends AbstractModule {
+
+  @Parameters(separators = "=")
+  public static class Options {
+    @Parameter(names = "-offer_selector_modules",
+        description = "Guice module for customizing the TaskAssignerImpl's OfferSelector.")
+    @SuppressWarnings("rawtypes")
+    public List<Class> offerSelectorModules =
+        ImmutableList.of(FirstFitOfferSelectorModule.class);
+  }
+
+  private final CliOptions cliOptions;
+
+  public TaskAssignerImplModule(CliOptions cliOptions) {
+    this.cliOptions = cliOptions;
+  }
+
+  @Override
+  protected void configure() {
+    Options options = cliOptions.taskAssigner;
+    for (Module module : MoreModules.instantiateAll(options.offerSelectorModules, cliOptions)) {
+      install(module);
+    }
+
+    bind(TaskAssigner.class).to(TaskAssignerImpl.class);
+    bind(TaskAssignerImpl.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 0002b0c..3c38f95 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -13,55 +13,10 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.preemptor.BiCache;
-import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-import static java.util.stream.Collectors.toMap;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
 
 /**
  * Enables scheduling and preemption of tasks.
@@ -77,150 +32,4 @@ public interface TaskScheduler extends EventSubscriber {
    *         task ID was not present in the result.
    */
   Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds);
-
-  /**
-   * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
-   * backs off after a failed scheduling attempt.
-   * <p>
-   * Pending tasks are advertised to the scheduler via internal pubsub notifications.
-   */
-  class TaskSchedulerImpl implements TaskScheduler {
-    /**
-     * Binding annotation for the time duration of reservations.
-     */
-    @VisibleForTesting
-    @Qualifier
-    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-    public @interface ReservationDuration { }
-
-    private static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerImpl.class);
-
-    private final TaskAssigner assigner;
-    private final Preemptor preemptor;
-    private final ExecutorSettings executorSettings;
-    private final BiCache<String, TaskGroupKey> reservations;
-
-    private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
-    private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
-    private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match");
-
-    @Inject
-    TaskSchedulerImpl(
-        TaskAssigner assigner,
-        Preemptor preemptor,
-        ExecutorSettings executorSettings,
-        BiCache<String, TaskGroupKey> reservations) {
-
-      this.assigner = requireNonNull(assigner);
-      this.preemptor = requireNonNull(preemptor);
-      this.executorSettings = requireNonNull(executorSettings);
-      this.reservations = requireNonNull(reservations);
-    }
-
-    @Timed ("task_schedule_attempt")
-    public Set<String> schedule(MutableStoreProvider store, Iterable<String> taskIds) {
-      try {
-        return scheduleTasks(store, taskIds);
-      } catch (RuntimeException e) {
-        // We catch the generic unchecked exception here to ensure tasks are not abandoned
-        // if there is a transient issue resulting in an unchecked exception.
-        LOG.warn("Task scheduling unexpectedly failed, will be retried", e);
-        attemptsFailed.incrementAndGet();
-        // Return empty set for all task IDs to be retried later.
-        // It's ok if some tasks were already assigned, those will be ignored in the next round.
-        return ImmutableSet.of();
-      }
-    }
-
-    private Set<String> scheduleTasks(MutableStoreProvider store, Iterable<String> tasks) {
-      ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
-      String taskIdValues = Joiner.on(",").join(taskIds);
-      LOG.debug("Attempting to schedule tasks {}", taskIdValues);
-      ImmutableSet<IAssignedTask> assignedTasks =
-          ImmutableSet.copyOf(Iterables.transform(
-              store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
-              IScheduledTask::getAssignedTask));
-
-      if (Iterables.isEmpty(assignedTasks)) {
-        LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues);
-        return taskIds;
-      }
-
-      Preconditions.checkState(
-          assignedTasks.stream()
-              .collect(Collectors.groupingBy(t -> t.getTask()))
-              .entrySet()
-              .size() == 1,
-          "Found multiple task groups for %s",
-          taskIdValues);
-
-      Map<String, IAssignedTask> assignableTaskMap =
-          assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
-
-      if (taskIds.size() != assignedTasks.size()) {
-        LOG.warn("Failed to look up tasks "
-            + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
-      }
-
-      // This is safe after all checks above.
-      ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
-      AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
-
-      // Valid Docker tasks can have a container but no executor config
-      ResourceBag overhead = ResourceBag.EMPTY;
-      if (task.isSetExecutorConfig()) {
-        overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
-            .orElseThrow(
-                () -> new IllegalArgumentException("Cannot find executor configuration"));
-      }
-
-      Set<String> launched = assigner.maybeAssign(
-          store,
-          new ResourceRequest(
-              task,
-              bagFromResources(task.getResources()).add(overhead), aggregate),
-          TaskGroupKey.from(task),
-          assignedTasks,
-          reservations.asMap());
-
-      attemptsFired.addAndGet(assignableTaskMap.size());
-      Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
-
-      failedToLaunch.forEach(taskId -> {
-        // Task could not be scheduled.
-        // TODO(maxim): Now that preemption slots are searched asynchronously, consider
-        // retrying a launch attempt within the current scheduling round IFF a reservation is
-        // available.
-        maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
-      });
-      attemptsNoMatch.addAndGet(failedToLaunch.size());
-
-      // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
-      return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
-    }
-
-    private void maybePreemptFor(
-        IAssignedTask task,
-        AttributeAggregate jobState,
-        MutableStoreProvider storeProvider) {
-
-      if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
-        return;
-      }
-      Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
-      if (slaveId.isPresent()) {
-        reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
-      }
-    }
-
-    @Subscribe
-    public void taskChanged(final TaskStateChange stateChangeEvent) {
-      if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
-        IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
-        if (assigned.getSlaveId() != null) {
-          reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
new file mode 100644
index 0000000..b6d5d95
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.preemptor.Preemptor;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toMap;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
+
+/**
+ * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
+ * backs off after a failed scheduling attempt.
+ * <p>
+ * Pending tasks are advertised to the scheduler via internal pubsub notifications.
+ */
+public class TaskSchedulerImpl implements TaskScheduler {
+  /**
+   * Binding annotation for the time duration of reservations.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface ReservationDuration { }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.class);
+
+  private final TaskAssigner assigner;
+  private final Preemptor preemptor;
+  private final ExecutorSettings executorSettings;
+  private final BiCache<String, TaskGroupKey> reservations;
+
+  private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
+  private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
+  private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match");
+
+  @Inject
+  TaskSchedulerImpl(
+      TaskAssigner assigner,
+      Preemptor preemptor,
+      ExecutorSettings executorSettings,
+      BiCache<String, TaskGroupKey> reservations) {
+
+    this.assigner = requireNonNull(assigner);
+    this.preemptor = requireNonNull(preemptor);
+    this.executorSettings = requireNonNull(executorSettings);
+    this.reservations = requireNonNull(reservations);
+  }
+
+  @Timed("task_schedule_attempt")
+  public Set<String> schedule(Storage.MutableStoreProvider store, Iterable<String> taskIds) {
+    try {
+      return scheduleTasks(store, taskIds);
+    } catch (RuntimeException e) {
+      // We catch the generic unchecked exception here to ensure tasks are not abandoned
+      // if there is a transient issue resulting in an unchecked exception.
+      LOG.warn("Task scheduling unexpectedly failed, will be retried", e);
+      attemptsFailed.incrementAndGet();
+      // Return empty set for all task IDs to be retried later.
+      // It's ok if some tasks were already assigned, those will be ignored in the next round.
+      return ImmutableSet.of();
+    }
+  }
+
+  private Set<String> scheduleTasks(Storage.MutableStoreProvider store, Iterable<String> tasks) {
+    ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
+    String taskIdValues = Joiner.on(",").join(taskIds);
+    LOG.debug("Attempting to schedule tasks {}", taskIdValues);
+    ImmutableSet<IAssignedTask> assignedTasks =
+        ImmutableSet.copyOf(Iterables.transform(
+            store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
+            IScheduledTask::getAssignedTask));
+
+    if (Iterables.isEmpty(assignedTasks)) {
+      LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues);
+      return taskIds;
+    }
+
+    Preconditions.checkState(
+        assignedTasks.stream()
+            .collect(Collectors.groupingBy(t -> t.getTask()))
+            .entrySet()
+            .size() == 1,
+        "Found multiple task groups for %s",
+        taskIdValues);
+
+    Map<String, IAssignedTask> assignableTaskMap =
+        assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+
+    if (taskIds.size() != assignedTasks.size()) {
+      LOG.warn("Failed to look up tasks "
+          + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
+    }
+
+    // This is safe after all checks above.
+    ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+    AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
+
+    // Valid Docker tasks can have a container but no executor config
+    ResourceBag overhead = ResourceBag.EMPTY;
+    if (task.isSetExecutorConfig()) {
+      overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
+          .orElseThrow(
+              () -> new IllegalArgumentException("Cannot find executor configuration"));
+    }
+
+    Set<String> launched = assigner.maybeAssign(
+        store,
+        new SchedulingFilter.ResourceRequest(
+            task,
+            bagFromResources(task.getResources()).add(overhead), aggregate),
+        TaskGroupKey.from(task),
+        assignedTasks,
+        reservations.asMap());
+
+    attemptsFired.addAndGet(assignableTaskMap.size());
+    Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
+
+    failedToLaunch.forEach(taskId -> {
+      // Task could not be scheduled.
+      // TODO(maxim): Now that preemption slots are searched asynchronously, consider
+      // retrying a launch attempt within the current scheduling round IFF a reservation is
+      // available.
+      maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+    });
+    attemptsNoMatch.addAndGet(failedToLaunch.size());
+
+    // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
+    return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
+  }
+
+  private void maybePreemptFor(
+      IAssignedTask task,
+      AttributeAggregate jobState,
+      Storage.MutableStoreProvider storeProvider) {
+
+    if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
+      return;
+    }
+    Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
+    if (slaveId.isPresent()) {
+      reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
+    }
+  }
+
+  @Subscribe
+  public void taskChanged(final PubsubEvent.TaskStateChange stateChangeEvent) {
+    if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
+      IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
+      if (assigned.getSlaveId() != null) {
+        reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java b/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
deleted file mode 100644
index dc244ee..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-
-import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner;
-
-/**
- *  Exposes the default TaskAssigner implementation, which is a first-fit scheduling algorithm.
- */
-public class FirstFitTaskAssignerModule extends AbstractModule {
-  @Override
-  protected void configure() {
-    bind(TaskAssigner.class).to(FirstFitTaskAssigner.class);
-    bind(FirstFitTaskAssigner.class).in(Singleton.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index b7a3c0b..46e9227 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -29,6 +29,7 @@ import org.apache.aurora.scheduler.config.CliOptions;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
+import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule;
 import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
 import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
 
@@ -42,7 +43,7 @@ public class StateModule extends AbstractModule {
     @Parameter(names = "-task_assigner_modules",
         description = "Guice modules for customizing task assignment.")
     @SuppressWarnings("rawtypes")
-    public List<Class> taskAssignerModules = ImmutableList.of(FirstFitTaskAssignerModule.class);
+    public List<Class> taskAssignerModules = ImmutableList.of(TaskAssignerImplModule.class);
   }
 
   private final CliOptions options;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
deleted file mode 100644
index cdd0d15..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.InstanceKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceManager;
-import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos;
-import org.apache.mesos.v1.Protos.TaskInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.mesos.v1.Protos.Offer;
-
-/**
- * Responsible for matching a task against an offer and launching it.
- */
-public interface TaskAssigner {
-  /**
-   * Tries to match a task against an offer.  If a match is found, the assigner makes the
-   * appropriate changes to the task and requests task launch.
-   *
-   * @param storeProvider Storage provider.
-   * @param resourceRequest The request for resources being scheduled.
-   * @param groupKey Task group key.
-   * @param tasks Tasks to assign.
-   * @param preemptionReservations Slave reservations.
-   * @return Successfully assigned task IDs.
-   */
-  Set<String> maybeAssign(
-      MutableStoreProvider storeProvider,
-      ResourceRequest resourceRequest,
-      TaskGroupKey groupKey,
-      Iterable<IAssignedTask> tasks,
-      Map<String, TaskGroupKey> preemptionReservations);
-
-  class FirstFitTaskAssigner implements TaskAssigner {
-    private static final Logger LOG = LoggerFactory.getLogger(FirstFitTaskAssigner.class);
-
-    @VisibleForTesting
-    static final Optional<String> LAUNCH_FAILED_MSG =
-        Optional.of("Unknown exception attempting to schedule task.");
-    @VisibleForTesting
-    static final String ASSIGNER_LAUNCH_FAILURES = "assigner_launch_failures";
-    @VisibleForTesting
-    static final String ASSIGNER_EVALUATED_OFFERS = "assigner_evaluated_offers";
-
-    private final AtomicLong launchFailures;
-    private final AtomicLong evaluatedOffers;
-
-    private final StateManager stateManager;
-    private final SchedulingFilter filter;
-    private final MesosTaskFactory taskFactory;
-    private final OfferManager offerManager;
-    private final TierManager tierManager;
-    private final UpdateAgentReserver updateAgentReserver;
-
-    @Inject
-    public FirstFitTaskAssigner(
-        StateManager stateManager,
-        SchedulingFilter filter,
-        MesosTaskFactory taskFactory,
-        OfferManager offerManager,
-        TierManager tierManager,
-        UpdateAgentReserver updateAgentReserver,
-        StatsProvider statsProvider) {
-
-      this.stateManager = requireNonNull(stateManager);
-      this.filter = requireNonNull(filter);
-      this.taskFactory = requireNonNull(taskFactory);
-      this.offerManager = requireNonNull(offerManager);
-      this.tierManager = requireNonNull(tierManager);
-      this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
-      this.evaluatedOffers = statsProvider.makeCounter(ASSIGNER_EVALUATED_OFFERS);
-      this.updateAgentReserver = requireNonNull(updateAgentReserver);
-    }
-
-    @VisibleForTesting
-    IAssignedTask mapAndAssignResources(Offer offer, IAssignedTask task) {
-      IAssignedTask assigned = task;
-      for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) {
-        if (type.getMapper().isPresent()) {
-          assigned = type.getMapper().get().mapAndAssign(offer, assigned);
-        }
-      }
-      return assigned;
-    }
-
-    private TaskInfo assign(
-        MutableStoreProvider storeProvider,
-        Offer offer,
-        String taskId,
-        boolean revocable) {
-
-      String host = offer.getHostname();
-      IAssignedTask assigned = stateManager.assignTask(
-          storeProvider,
-          taskId,
-          host,
-          offer.getAgentId(),
-          task -> mapAndAssignResources(offer, task));
-      LOG.info(
-          "Offer on agent {} (id {}) is being assigned task for {}.",
-          host, offer.getAgentId().getValue(), taskId);
-      return taskFactory.createFrom(assigned, offer, revocable);
-    }
-
-    private boolean evaluateOffer(
-        MutableStoreProvider storeProvider,
-        boolean revocable,
-        ResourceRequest resourceRequest,
-        TaskGroupKey groupKey,
-        IAssignedTask task,
-        HostOffer offer,
-        ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
-
-      String taskId = task.getTaskId();
-      Set<Veto> vetoes = filter.filter(
-          new UnusedResource(
-              offer.getResourceBag(revocable),
-              offer.getAttributes(),
-              offer.getUnavailabilityStart()),
-          resourceRequest);
-
-      if (vetoes.isEmpty()) {
-        TaskInfo taskInfo = assign(
-            storeProvider,
-            offer.getOffer(),
-            taskId,
-            revocable);
-        resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
-
-        try {
-          offerManager.launchTask(offer.getOffer().getId(), taskInfo);
-          assignmentResult.add(taskId);
-          return true;
-        } catch (OfferManager.LaunchException e) {
-          LOG.warn("Failed to launch task.", e);
-          launchFailures.incrementAndGet();
-
-          // The attempt to schedule the task failed, so we need to backpedal on the
-          // assignment.
-          // It is in the LOST state and a new task will move to PENDING to replace it.
-          // Should the state change fail due to storage issues, that's okay.  The task will
-          // time out in the ASSIGNED state and be moved to LOST.
-          stateManager.changeState(
-              storeProvider,
-              taskId,
-              Optional.of(PENDING),
-              LOST,
-              LAUNCH_FAILED_MSG);
-          throw e;
-        }
-      } else {
-        if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
-          // Never attempt to match this offer/groupKey pair again.
-          offerManager.banOfferForTaskGroup(offer.getOffer().getId(), groupKey);
-        }
-        LOG.debug("Agent {} vetoed task {}: {}", offer.getOffer().getHostname(), taskId, vetoes);
-      }
-      return false;
-    }
-
-    private Iterable<IAssignedTask> maybeAssignReserved(
-        Iterable<IAssignedTask> tasks,
-        MutableStoreProvider storeProvider,
-        boolean revocable,
-        ResourceRequest resourceRequest,
-        TaskGroupKey groupKey,
-        ImmutableSet.Builder<String> assignmentResult) {
-
-      if (!updateAgentReserver.hasReservations(groupKey)) {
-        return tasks;
-      }
-
-      // Data structure to record which tasks should be excluded from the regular (non-reserved)
-      // scheduling loop. This is important because we release reservations once they are used,
-      // so we need to record them separately to avoid them being double-scheduled.
-      ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
-
-      for (IAssignedTask task: tasks) {
-        IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
-        Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
-        if (maybeAgentId.isPresent()) {
-          excludeBuilder.add(key);
-          Optional<HostOffer> offer = offerManager.getOffer(
-              Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build());
-          if (offer.isPresent()) {
-            try {
-              // The offer can still be veto'd because of changed constraints, or because the
-              // Scheduler hasn't been updated by Mesos yet...
-              if (evaluateOffer(
-                  storeProvider,
-                  revocable,
-                  resourceRequest,
-                  groupKey,
-                  task,
-                  offer.get(),
-                  assignmentResult)) {
-
-                LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
-                updateAgentReserver.release(maybeAgentId.get(), key);
-              } else {
-                LOG.info(
-                    "Tried to reuse offer on {} for {}, but was not ready yet.",
-                    maybeAgentId.get(),
-                    key);
-              }
-            } catch (OfferManager.LaunchException e) {
-              updateAgentReserver.release(maybeAgentId.get(), key);
-            }
-          }
-        }
-      }
-
-      // Return only the tasks that didn't have reservations. Offers on agents that were reserved
-      // might not have been seen by Aurora yet, so we need to wait until the reservation expires
-      // before giving up and falling back to the first-fit algorithm.
-      Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
-      return Iterables.filter(tasks, t -> !toBeExcluded.contains(
-          InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
-    }
-
-    @Timed("assigner_maybe_assign")
-    @Override
-    public Set<String> maybeAssign(
-        MutableStoreProvider storeProvider,
-        ResourceRequest resourceRequest,
-        TaskGroupKey groupKey,
-        Iterable<IAssignedTask> tasks,
-        Map<String, TaskGroupKey> preemptionReservations) {
-
-      if (Iterables.isEmpty(tasks)) {
-        return ImmutableSet.of();
-      }
-
-      boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
-      ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
-
-      Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
-          tasks,
-          storeProvider,
-          revocable,
-          resourceRequest,
-          groupKey,
-          assignmentResult);
-
-      Iterator<IAssignedTask> remainingTasks = nonReservedTasks.iterator();
-      // Make sure we still have tasks to process after reservations are processed.
-      if (remainingTasks.hasNext()) {
-        IAssignedTask task = remainingTasks.next();
-        for (HostOffer offer : offerManager.getOffers(groupKey)) {
-
-          if (!offer.hasCpuAndMem()) {
-            // This offer lacks any type of CPU or mem resource, and therefore will never match
-            // a task.
-            continue;
-          }
-
-          String agentId = offer.getOffer().getAgentId().getValue();
-
-          Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
-              preemptionReservations.get(agentId));
-
-          if (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) {
-            // This slave is reserved for a different task group -> skip.
-            continue;
-          }
-
-          if (!updateAgentReserver.getReservations(agentId).isEmpty()) {
-            // This agent has been reserved for an update in-progress, skip.
-            continue;
-          }
-
-          evaluatedOffers.incrementAndGet();
-          try {
-            boolean offerUsed = evaluateOffer(
-                storeProvider, revocable, resourceRequest, groupKey, task, offer, assignmentResult);
-            if (offerUsed) {
-              if (remainingTasks.hasNext()) {
-                task = remainingTasks.next();
-              } else {
-                break;
-              }
-            }
-          } catch (OfferManager.LaunchException e) {
-            break;
-          }
-        }
-      }
-
-      return assignmentResult.build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 6033c01..e629093 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -157,7 +157,7 @@ public class AsyncStatsModule extends AbstractModule {
 
     @Override
     public Iterable<MachineResource> get() {
-      Iterable<HostOffer> offers = offerManager.getOffers();
+      Iterable<HostOffer> offers = offerManager.getAll();
 
       ImmutableList.Builder<MachineResource> builder = ImmutableList.builder();
       for (HostOffer offer : offers) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 0ec4de6..5cb5310 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -160,6 +160,7 @@ public class CommandLineTest {
     expected.scheduling.reservationDuration = TEST_TIME;
     expected.scheduling.schedulingMaxBatchSize = 42;
     expected.scheduling.maxTasksPerScheduleAttempt = 42;
+    expected.taskAssigner.offerSelectorModules = ImmutableList.of(NoopModule.class);
     expected.async.asyncWorkerThreads = 42;
     expected.zk.inProcess = true;
     expected.zk.zkEndpoints = ImmutableList.of(InetSocketAddress.createUnresolved("testing", 42));
@@ -266,11 +267,11 @@ public class CommandLineTest {
         "-hold_offers_forever=true",
         "-min_offer_hold_time=42days",
         "-offer_hold_jitter_window=42days",
-        "-offer_static_ban_cache_max_size=42",
         "-offer_filter_duration=42days",
         "-unavailability_threshold=42days",
         "-offer_order=CPU,DISK",
         "-offer_order_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
+        "-offer_static_ban_cache_max_size=42",
         "-custom_executor_config=" + tempFile.getAbsolutePath(),
         "-thermos_executor_path=testing",
         "-thermos_executor_resources=testing",
@@ -306,6 +307,7 @@ public class CommandLineTest {
         "-offer_reservation_duration=42days",
         "-scheduling_max_batch_size=42",
         "-max_tasks_per_schedule_attempt=42",
+        "-offer_selector_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
         "-async_worker_threads=42",
         "-zk_in_proc=true",
         "-zk_endpoints=testing:42",

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java b/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
index 3069959..549d2e3 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
@@ -49,7 +49,7 @@ public class OffersTest extends EasyMockTest {
 
   @Test
   public void testNoOffers() throws Exception {
-    expect(offerManager.getOffers()).andReturn(ImmutableSet.of());
+    expect(offerManager.getAll()).andReturn(ImmutableSet.of());
 
     control.replay();
 
@@ -134,7 +134,7 @@ public class OffersTest extends EasyMockTest {
             .build(),
         IHostAttributes.build(new HostAttributes().setMode(NONE)));
 
-    expect(offerManager.getOffers()).andReturn(ImmutableSet.of(offer));
+    expect(offerManager.getAll()).andReturn(ImmutableSet.of(offer));
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
index 45ae6bb..64efc0d 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
@@ -259,7 +259,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
   public void testOffers() {
     storageUtil.expectOperations();
     expectOfferAttributesSaved(HOST_OFFER);
-    offerManager.addOffer(HOST_OFFER);
+    offerManager.add(HOST_OFFER);
 
     control.replay();
 
@@ -272,8 +272,8 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     storageUtil.expectOperations();
     expectOfferAttributesSaved(HOST_OFFER);
     expectOfferAttributesSaved(HOST_OFFER_2);
-    offerManager.addOffer(HOST_OFFER);
-    offerManager.addOffer(HOST_OFFER_2);
+    offerManager.add(HOST_OFFER);
+    offerManager.add(HOST_OFFER_2);
 
     control.replay();
 
@@ -296,7 +296,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
 
     // If the host is in draining, then the offer manager should get an offer with that attribute
-    offerManager.addOffer(DRAINING_HOST_OFFER);
+    offerManager.add(DRAINING_HOST_OFFER);
 
     control.replay();
     handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer()));
@@ -316,7 +316,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
 
   @Test
   public void testRescind() {
-    expect(offerManager.cancelOffer(OFFER_ID)).andReturn(true);
+    expect(offerManager.cancel(OFFER_ID)).andReturn(true);
 
     control.replay();
 
@@ -336,12 +336,12 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     FakeScheduledThreadPoolExecutor fakeExecutor = new FakeScheduledThreadPoolExecutor();
     createHandler(false, fakeExecutor);
 
-    expect(offerManager.cancelOffer(OFFER_ID)).andReturn(false);
-    offerManager.banOffer(OFFER_ID);
+    expect(offerManager.cancel(OFFER_ID)).andReturn(false);
+    offerManager.ban(OFFER_ID);
     storageUtil.expectOperations();
     expectOfferAttributesSaved(HOST_OFFER);
-    offerManager.addOffer(HOST_OFFER);
-    expect(offerManager.cancelOffer(OFFER_ID)).andReturn(true);
+    offerManager.add(HOST_OFFER);
+    expect(offerManager.cancel(OFFER_ID)).andReturn(true);
 
     control.replay();
     replay(offerManager);
@@ -355,7 +355,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     // Eventually, we unban the offer.
     handler.handleRescind(OFFER_ID);
 
-    // 2 commands executed (addOffer and unbanOffer).
+    // 2 commands executed (add and unbanOffer).
     fakeExecutor.advance();
     fakeExecutor.advance();
 


[3/3] aurora git commit: Enable custom offer scoring modules for task assignment

Posted by wf...@apache.org.
Enable custom offer scoring modules for task assignment

Major portions of the refactor:

* Refactor `OfferManager` to do filtering of offers (added `getMatching` and
  `getAllMatching` methods) as opposed to TaskAssigner
* Refactor `TaskAssigner`, allow for injection of custom "scoring" class
  through `OfferRanker` interface

And some minor things as well:

* Moved `TaskAssignerImpl`, `TaskSchedulerImpl`, and `HostOffers` into their own
  upper-level classes
* Moved `TaskAssigner` to the `scheduling` package and out of the `state` package
* Renamed some methods in `OfferManager` to avoid code stutter
* Renaming of some classes (e.g. `FirstFitTaskAssigner` -> `TaskAssignerImpl`)
* And a slew of others

Reviewed at https://reviews.apache.org/r/63973/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/80139da4
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/80139da4
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/80139da4

Branch: refs/heads/master
Commit: 80139da4624916e406c7e80c4ea2d286d4d859c3
Parents: 21af250
Author: Jordan Ly <jo...@gmail.com>
Authored: Tue Nov 28 11:02:14 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Nov 28 11:02:14 2017 -0800

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   3 +
 docs/reference/scheduler-configuration.md       |   2 +-
 .../org/apache/aurora/benchmark/Offers.java     |   2 +-
 .../aurora/benchmark/SchedulingBenchmarks.java  |  16 +-
 .../benchmark/fakes/FakeOfferManager.java       |  31 +-
 .../apache/aurora/scheduler/app/AppModule.java  |   4 +-
 .../aurora/scheduler/config/CliOptions.java     |   6 +-
 .../scheduler/filter/AttributeAggregate.java    |  38 +-
 .../scheduler/filter/SchedulingFilter.java      |   5 +
 .../scheduler/filter/SchedulingFilterImpl.java  |   2 +-
 .../apache/aurora/scheduler/http/Offers.java    |   2 +-
 .../scheduler/mesos/MesosCallbackHandler.java   |  12 +-
 .../aurora/scheduler/offers/HostOffers.java     | 253 +++++++++
 .../aurora/scheduler/offers/OfferManager.java   | 404 ++------------
 .../scheduler/offers/OfferManagerImpl.java      | 246 +++++++++
 .../scheduler/offers/OfferManagerModule.java    | 211 ++++++++
 .../aurora/scheduler/offers/OfferSettings.java  |   7 +-
 .../aurora/scheduler/offers/OffersModule.java   | 211 --------
 .../preemptor/PendingTaskProcessor.java         |   2 +-
 .../aurora/scheduler/preemptor/Preemptor.java   |   2 +-
 .../scheduling/FirstFitOfferSelector.java       |  29 +
 .../scheduling/FirstFitOfferSelectorModule.java |  26 +
 .../scheduler/scheduling/OfferSelector.java     |  36 ++
 .../scheduler/scheduling/SchedulingModule.java  |   4 +-
 .../scheduler/scheduling/TaskAssigner.java      |  46 ++
 .../scheduler/scheduling/TaskAssignerImpl.java  | 284 ++++++++++
 .../scheduling/TaskAssignerImplModule.java      |  59 ++
 .../scheduler/scheduling/TaskScheduler.java     | 191 -------
 .../scheduler/scheduling/TaskSchedulerImpl.java | 207 +++++++
 .../state/FirstFitTaskAssignerModule.java       |  31 --
 .../aurora/scheduler/state/StateModule.java     |   3 +-
 .../aurora/scheduler/state/TaskAssigner.java    | 338 ------------
 .../scheduler/stats/AsyncStatsModule.java       |   2 +-
 .../scheduler/config/CommandLineTest.java       |   4 +-
 .../aurora/scheduler/http/OffersTest.java       |   4 +-
 .../mesos/MesosCallbackHandlerTest.java         |  20 +-
 .../scheduler/offers/OfferManagerImplTest.java  | 381 +++++++++----
 .../preemptor/PendingTaskProcessorTest.java     |   2 +-
 .../scheduler/preemptor/PreemptorImplTest.java  |   2 +-
 .../preemptor/PreemptorModuleTest.java          |   2 +-
 .../scheduling/FirstFitOfferSelectorTest.java   |  66 +++
 .../scheduling/TaskAssignerImplTest.java        | 374 +++++++++++++
 .../scheduling/TaskSchedulerImplTest.java       |   2 -
 .../state/FirstFitTaskAssignerTest.java         | 539 -------------------
 .../scheduler/stats/AsyncStatsModuleTest.java   |   2 +-
 45 files changed, 2244 insertions(+), 1869 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 2d3c423..54dcc75 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -12,6 +12,9 @@
   a production cluster. For that reason, the functionality is behind a new flag `-partition_aware`
   that is disabled by default. When Mesos support is improved and the new behavior is vetted in
   production clusters, we'll enable this by default.
+- Added the ability to "score" offers for a given scheduling assignment via the `OfferSelector`
+  interface. The default implementation is first fit, but cluster operators can inject a custom
+  scoring algorithm through the `-offer_selector_modules` flag.
 
 ### Deprecations and removals:
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 6c385b5..f697b6f 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -222,7 +222,7 @@ Optional flags:
 	Time for a stat to be retained in memory before expiring.
 -stat_sampling_interval (default (1, secs))
 	Statistic value sampling interval.
--task_assigner_modules (default [class org.apache.aurora.scheduler.state.FirstFitTaskAssignerModule])
+-task_assigner_modules (default [class org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule])
   Guice modules for replacing task assignment logic.
 -thermos_executor_cpu (default 0.25)
 	The number of CPU cores to allocate for each instance of the executor.

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/Offers.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
index 2b46326..2fcc804 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
@@ -50,7 +50,7 @@ final class Offers {
    */
   static void addOffers(OfferManager offerManager, Iterable<HostOffer> offers) {
     for (HostOffer offer : offers) {
-      offerManager.addOffer(offer);
+      offerManager.add(offer);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 1708a50..58e3224 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -62,15 +62,17 @@ import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
 import org.apache.aurora.scheduler.offers.Deferment;
 import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManagerImpl;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.offers.OfferOrder;
 import org.apache.aurora.scheduler.offers.OfferSettings;
-import org.apache.aurora.scheduler.offers.OffersModule;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.ClusterStateImpl;
 import org.apache.aurora.scheduler.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.scheduling.TaskScheduler;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
+import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.ReservationDuration;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -147,8 +149,8 @@ public class SchedulingBenchmarks {
               bind(ScheduledExecutorService.class).annotatedWith(AsyncModule.AsyncExecutor.class)
                   .toInstance(new NoopExecutor());
               bind(Deferment.class).to(Deferment.Noop.class);
-              bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
-              bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
+              bind(OfferManager.class).to(OfferManagerImpl.class);
+              bind(OfferManagerImpl.class).in(Singleton.class);
               bind(OfferSettings.class).toInstance(
                   new OfferSettings(NO_DELAY,
                       ImmutableList.of(OfferOrder.RANDOM),
@@ -157,8 +159,8 @@ public class SchedulingBenchmarks {
                       new FakeTicker()));
               bind(BiCache.BiCacheSettings.class).toInstance(
                   new BiCache.BiCacheSettings(DELAY_FOREVER, ""));
-              bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
-              bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+              bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+              bind(TaskSchedulerImpl.class).in(Singleton.class);
               expose(TaskScheduler.class);
               expose(OfferManager.class);
             }
@@ -171,7 +173,7 @@ public class SchedulingBenchmarks {
                   .toInstance(DELAY_FOREVER);
               bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class);
               bind(new TypeLiteral<Amount<Long, Time>>() { })
-                  .annotatedWith(OffersModule.UnavailabilityThreshold.class)
+                  .annotatedWith(OfferManagerModule.UnavailabilityThreshold.class)
                   .toInstance(Amount.of(1L, Time.MINUTES));
               bind(UpdateAgentReserver.class).to(UpdateAgentReserver.NullAgentReserver.class);
               bind(UpdateAgentReserver.NullAgentReserver.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index 201aa81..05c58ab 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -18,22 +18,23 @@ import com.google.common.base.Optional;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.mesos.v1.Protos;
 
 public class FakeOfferManager implements OfferManager {
   @Override
-  public void addOffer(HostOffer offer) {
+  public void add(HostOffer offer) {
     // no-op
   }
 
   @Override
-  public boolean cancelOffer(Protos.OfferID offerId) {
+  public boolean cancel(Protos.OfferID offerId) {
     return false;
   }
 
   @Override
-  public void banOffer(Protos.OfferID offerId) {
+  public void ban(Protos.OfferID offerId) {
     // no-op
   }
 
@@ -43,27 +44,33 @@ public class FakeOfferManager implements OfferManager {
   }
 
   @Override
-  public void banOfferForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) {
+  public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
     // no-op
   }
 
   @Override
-  public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
-    return null;
+  public Optional<HostOffer> get(Protos.AgentID agentId) {
+    return Optional.absent();
   }
 
   @Override
-  public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
-    // no-op
+  public Iterable<HostOffer> getAll() {
+    return null;
   }
 
   @Override
-  public Iterable<HostOffer> getOffers() {
-    return null;
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+                                         ResourceRequest resourceRequest,
+                                         boolean revocable) {
+
+    return Optional.absent();
   }
 
   @Override
-  public Optional<HostOffer> getOffer(Protos.AgentID agentId) {
-    return Optional.absent();
+  public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+                                            ResourceRequest resourceRequest,
+                                            boolean revocable) {
+
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 3204cca..817a019 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -47,7 +47,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.http.JettyServerModule;
 import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
 import org.apache.aurora.scheduler.metadata.MetadataModule;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.pruning.PruningModule;
 import org.apache.aurora.scheduler.quota.QuotaModule;
@@ -172,7 +172,7 @@ public class AppModule extends AbstractModule {
 
     install(new PubsubEventModule());
     install(new AsyncModule(options.async));
-    install(new OffersModule(options));
+    install(new OfferManagerModule(options));
     install(new PruningModule(options.pruning));
     install(new ReconciliationModule(options.reconciliation));
     install(new SchedulingModule(options.scheduling));

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
index d4537e3..b7f43e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
+++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
@@ -36,12 +36,13 @@ import org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule;
 import org.apache.aurora.scheduler.http.api.security.Kerberos5ShiroRealmModule;
 import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
 import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.pruning.PruningModule;
 import org.apache.aurora.scheduler.reconciliation.ReconciliationModule;
 import org.apache.aurora.scheduler.resources.ResourceSettings;
 import org.apache.aurora.scheduler.scheduling.SchedulingModule;
+import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule;
 import org.apache.aurora.scheduler.sla.SlaModule;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.stats.AsyncStatsModule;
@@ -54,7 +55,7 @@ import org.apache.aurora.scheduler.updater.UpdaterModule;
 public class CliOptions {
   public final ReconciliationModule.Options reconciliation =
       new ReconciliationModule.Options();
-  public final OffersModule.Options offer = new OffersModule.Options();
+  public final OfferManagerModule.Options offer = new OfferManagerModule.Options();
   public final ExecutorModule.Options executor = new ExecutorModule.Options();
   public final AppModule.Options app = new AppModule.Options();
   public final SchedulerMain.Options main = new SchedulerMain.Options();
@@ -84,6 +85,7 @@ public class CliOptions {
   public final StatsModule.Options stats = new StatsModule.Options();
   public final CronModule.Options cron = new CronModule.Options();
   public final ResourceSettings resourceSettings = new ResourceSettings();
+  public final TaskAssignerImplModule.Options taskAssigner = new TaskAssignerImplModule.Options();
   final List<Object> custom;
 
   public CliOptions() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index 60f141d..a5acafa 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -49,19 +49,8 @@ public final class AttributeAggregate {
    */
   private Supplier<Multiset<Pair<String, String>>> aggregate;
 
-  private boolean isInitialized = false;
-
   private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) {
-    this.aggregate = Suppliers.memoize(
-        () -> {
-          initialize();
-          return aggregate.get();
-        }
-    );
-  }
-
-  private void initialize() {
-    isInitialized = true; // inlining this assignment yields a PMD false positive
+    this.aggregate = Suppliers.memoize(aggregate);
   }
 
   /**
@@ -123,21 +112,16 @@ public final class AttributeAggregate {
   }
 
   public void updateAttributeAggregate(IHostAttributes attributes) {
-    // If the aggregate supplier has not been populated there is no need to update it here.
-    // All tasks attributes will be picked up by the wrapped task query if executed at a
-    // later point in time.
-    if (isInitialized) {
-      final Supplier<Multiset<Pair<String, String>>> previous = aggregate;
-      aggregate = Suppliers.memoize(
-          () -> {
-            ImmutableMultiset.Builder<Pair<String, String>> builder
-                = new ImmutableMultiset.Builder<>();
-            builder.addAll(previous.get());
-            addAttributes(builder, attributes.getAttributes());
-            return builder.build();
-          }
-      );
-    }
+    final Supplier<Multiset<Pair<String, String>>> previous = aggregate;
+    aggregate = Suppliers.memoize(
+        () -> {
+          ImmutableMultiset.Builder<Pair<String, String>> builder
+              = new ImmutableMultiset.Builder<>();
+          builder.addAll(previous.get());
+          addAttributes(builder, attributes.getAttributes());
+          return builder.build();
+        }
+    );
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 36608a9..a00c095 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -258,6 +259,10 @@ public interface SchedulingFilter {
       this(offer, attributes, Optional.absent());
     }
 
+    public UnusedResource(HostOffer offer, boolean revocable) {
+      this(offer.getResourceBag(revocable), offer.getAttributes(), offer.getUnavailabilityStart());
+    }
+
     public UnusedResource(ResourceBag offer, IHostAttributes attributes, Optional<Instant> start) {
       this.offer = offer;
       this.attributes = attributes;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index df51d4c..41a0764 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -32,7 +32,7 @@ import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.offers.OffersModule.UnavailabilityThreshold;
+import org.apache.aurora.scheduler.offers.OfferManagerModule.UnavailabilityThreshold;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAttribute;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index f22ca6e..bb92cd0 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -59,7 +59,7 @@ public class Offers {
   public Response getOffers() throws JsonProcessingException {
     return Response.ok(
         mapper.writeValueAsString(
-            StreamSupport.stream(offerManager.getOffers().spliterator(), false)
+            StreamSupport.stream(offerManager.getAll().spliterator(), false)
                 .map(o -> o.getOffer())
                 .collect(Collectors.toList())))
         .build();

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index fd5874d..87e702f 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -40,7 +40,7 @@ import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -126,7 +126,7 @@ public interface MesosCallbackHandler {
         Driver driver,
         Clock clock,
         MaintenanceController controller,
-        @OffersModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold,
+        @OfferManagerModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold,
         @PubsubEventModule.RegisteredEvents EventSink registeredEventSink) {
 
       this(
@@ -226,7 +226,7 @@ public interface MesosCallbackHandler {
             storeProvider.getAttributeStore().saveHostAttributes(attributes);
             log.info("Received offer: {}", offer.getId().getValue());
             offersReceived.incrementAndGet();
-            offerManager.addOffer(new HostOffer(offer, attributes));
+            offerManager.add(new HostOffer(offer, attributes));
           }
         });
       });
@@ -244,15 +244,15 @@ public interface MesosCallbackHandler {
       //      In this scenario, we want to ensure that we do not use it/accept it when the executor
       //      finally processes the offer. We will temporarily ban it and add a command for the
       //      executor to unban it so future offers can be processed normally.
-      boolean offerCancelled = offerManager.cancelOffer(offerId);
+      boolean offerCancelled = offerManager.cancel(offerId);
       if (!offerCancelled) {
         log.info(
             "Received rescind before adding offer: {}, temporarily banning.",
             offerId.getValue());
-        offerManager.banOffer(offerId);
+        offerManager.ban(offerId);
         executor.execute(() -> {
           log.info("Cancelling and unbanning offer: {}.", offerId.getValue());
-          offerManager.cancelOffer(offerId);
+          offerManager.cancel(offerId);
         });
       }
       offersRescinded.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
new file mode 100644
index 0000000..8adbcb1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.v1.Protos;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A container for the data structures used by this {@link OfferManagerImpl}, to make it easier to
+ * reason about the different indices used and their consistency.
+ */
+class HostOffers {
+  private final Set<HostOffer> offers;
+
+  private final Map<Protos.OfferID, HostOffer> offersById = Maps.newHashMap();
+  private final Map<Protos.AgentID, HostOffer> offersBySlave = Maps.newHashMap();
+  private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
+
+  // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
+  // scheduling attempts. See VetoGroup for more details on static ban.
+  private final Cache<Pair<Protos.OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers;
+  private final SchedulingFilter schedulingFilter;
+
+  // Keep track of globally banned offers that will never be matched to anything.
+  private final Set<Protos.OfferID> globallyBannedOffers = Sets.newHashSet();
+
+  // Keep track of the number of offers evaluated for vetoes when getting matching offers
+  private final AtomicLong vetoEvaluatedOffers;
+
+  HostOffers(StatsProvider statsProvider,
+             OfferSettings offerSettings,
+             SchedulingFilter schedulingFilter) {
+    this.offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering());
+    this.staticallyBannedOffers = offerSettings
+        .getStaticBanCacheBuilder()
+        .build();
+    this.schedulingFilter = requireNonNull(schedulingFilter);
+
+    // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
+    // Could track this separately if it turns out to pose problems.
+    statsProvider.exportSize(OfferManagerImpl.OUTSTANDING_OFFERS, offers);
+    statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS,
+        staticallyBannedOffers::size);
+    statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS_HIT_RATE,
+        () -> staticallyBannedOffers.stats().hitRate());
+    statsProvider.makeGauge(OfferManagerImpl.GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size);
+
+    vetoEvaluatedOffers = statsProvider.makeCounter(OfferManagerImpl.VETO_EVALUATED_OFFERS);
+  }
+
+  /**
+   * Adds an offer while maintaining a guarantee that no two offers may exist with the same
+   * agent ID.  If an offer exists with the same agent ID, the existing offer is removed
+   * and returned, and {@code offer} is not added.
+   *
+   * @param offer Offer to add.
+   * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
+   *         which will also be removed prior to returning.
+   */
+  synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
+    HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
+    if (sameAgent != null) {
+      remove(sameAgent.getOffer().getId());
+      return Optional.of(sameAgent);
+    }
+
+    addInternal(offer);
+    return Optional.absent();
+  }
+
+  private void addInternal(HostOffer offer) {
+    offers.add(offer);
+    offersById.put(offer.getOffer().getId(), offer);
+    offersBySlave.put(offer.getOffer().getAgentId(), offer);
+    offersByHost.put(offer.getOffer().getHostname(), offer);
+  }
+
+  synchronized boolean remove(Protos.OfferID id) {
+    HostOffer removed = offersById.remove(id);
+    if (removed != null) {
+      offers.remove(removed);
+      offersBySlave.remove(removed.getOffer().getAgentId());
+      offersByHost.remove(removed.getOffer().getHostname());
+    }
+    globallyBannedOffers.remove(id);
+    return removed != null;
+  }
+
+  synchronized void addGlobalBan(Protos.OfferID offerId) {
+    globallyBannedOffers.add(offerId);
+  }
+
+  synchronized void updateHostAttributes(IHostAttributes attributes) {
+    HostOffer offer = offersByHost.remove(attributes.getHost());
+    if (offer != null) {
+      // Remove and re-add a host's offer to re-sort based on its new hostStatus
+      remove(offer.getOffer().getId());
+      addInternal(new HostOffer(offer.getOffer(), attributes));
+    }
+  }
+
+  synchronized Optional<HostOffer> get(Protos.AgentID slaveId) {
+    HostOffer offer = offersBySlave.get(slaveId);
+    if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) {
+      return Optional.absent();
+    }
+
+    return Optional.of(offer);
+  }
+
+  /**
+   * Returns an iterable giving the state of the offers at the time the method is called. Unlike
+   * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and
+   * will not be modified outside of the returned iterable.
+   *
+   * @return The offers currently known by the scheduler.
+   */
+  synchronized Iterable<HostOffer> getOffers() {
+    return FluentIterable.from(offers)
+        .filter(o -> !globallyBannedOffers.contains(o.getOffer().getId()))
+        .toSet();
+  }
+
+  synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+                                               ResourceRequest resourceRequest,
+                                               boolean revocable) {
+
+    Optional<HostOffer> optionalOffer = get(slaveId);
+    if (optionalOffer.isPresent()) {
+      HostOffer offer = optionalOffer.get();
+
+      if (isGloballyBanned(offer)
+          || isVetoed(offer, resourceRequest, revocable, Optional.absent())) {
+
+        return Optional.absent();
+      }
+    }
+
+    return optionalOffer;
+  }
+
+  /**
+   * Returns a weakly-consistent iterable giving the available offers to a given
+   * {@code groupKey}. This iterable can handle concurrent operations on its underlying
+   * collection, and may reflect changes that happen after the construction of the iterable.
+   * This property is mainly used in {@code launchTask}.
+   *
+   * @param groupKey The task group to get offers for.
+   * @return The offers a given task group can use.
+   */
+  synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+                                                  ResourceRequest resourceRequest,
+                                                  boolean revocable) {
+
+    return Iterables.unmodifiableIterable(FluentIterable.from(offers)
+        .filter(o -> !isGloballyBanned(o))
+        .filter(o -> !isStaticallyBanned(o, groupKey))
+        .filter(HostOffer::hasCpuAndMem)
+        .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey))));
+  }
+
+  private synchronized boolean isGloballyBanned(HostOffer offer) {
+    return globallyBannedOffers.contains(offer.getOffer().getId());
+  }
+
+  private synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
+    return staticallyBannedOffers.getIfPresent(Pair.of(offer.getOffer().getId(), groupKey)) != null;
+  }
+
+  /**
+   * Determine whether or not the {@link HostOffer} is vetoed for the given {@link ResourceRequest}.
+   * If {@code groupKey} is present, this method will also temporarily ban the offer from ever
+   * matching the {@link TaskGroupKey}.
+   */
+  private boolean isVetoed(HostOffer offer,
+                           ResourceRequest resourceRequest,
+                           boolean revocable,
+                           Optional<TaskGroupKey> groupKey) {
+
+    vetoEvaluatedOffers.incrementAndGet();
+    UnusedResource unusedResource = new UnusedResource(offer, revocable);
+    Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest);
+    if (!vetoes.isEmpty()) {
+      if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) {
+        addStaticGroupBan(offer.getOffer().getId(), groupKey.get());
+      }
+
+      return true;
+    }
+
+    return false;
+  }
+
+  @VisibleForTesting
+  synchronized void addStaticGroupBan(Protos.OfferID offerId, TaskGroupKey groupKey) {
+    if (offersById.containsKey(offerId)) {
+      staticallyBannedOffers.put(Pair.of(offerId, groupKey), true);
+    }
+  }
+
+  @VisibleForTesting
+  synchronized Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() {
+    return staticallyBannedOffers.asMap().keySet();
+  }
+
+  synchronized void clear() {
+    offers.clear();
+    offersById.clear();
+    offersBySlave.clear();
+    offersByHost.clear();
+    staticallyBannedOffers.invalidateAll();
+    globallyBannedOffers.clear();
+  }
+
+  @VisibleForTesting
+  synchronized void cleanUpStaticallyBannedOffers() {
+    staticallyBannedOffers.cleanUp();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 96b0f46..0349215 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -13,41 +13,16 @@
  */
 package org.apache.aurora.scheduler.offers;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.cache.Cache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.Subscribe;
 
-import org.apache.aurora.common.collections.Pair;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.mesos.v1.Protos;
 import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.Offer.Operation;
 import org.apache.mesos.v1.Protos.OfferID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
 
@@ -61,7 +36,7 @@ public interface OfferManager extends EventSubscriber {
    *
    * @param offer Newly-available resource offer.
    */
-  void addOffer(HostOffer offer);
+  void add(HostOffer offer);
 
   /**
    * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
@@ -70,62 +45,70 @@ public interface OfferManager extends EventSubscriber {
    * @param offerId Cancelled offer.
    * @return A boolean on whether or not the offer was successfully cancelled.
    */
-  boolean cancelOffer(OfferID offerId);
+  boolean cancel(OfferID offerId);
 
   /**
    * Exclude an offer from being matched against all tasks.
    *
    * @param offerId Offer ID to ban.
    */
-  void banOffer(OfferID offerId);
+  void ban(OfferID offerId);
 
   /**
-   * Exclude an offer that results in a static mismatch from further attempts to match against all
-   * tasks from the same group.
+   * Notifies the offer queue that a host's attributes have changed.
    *
-   * @param offerId Offer ID to exclude for the given {@code groupKey}.
-   * @param groupKey Task group key to exclude.
+   * @param change State change notification.
    */
-  void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey);
+  void hostAttributesChanged(HostAttributesChanged change);
 
   /**
-   * Launches the task matched against the offer.
+   * Gets the offer for the given slave ID.
    *
-   * @param offerId Matched offer ID.
-   * @param task Matched task info.
-   * @throws LaunchException If there was an error launching the task.
+   * @param slaveId Slave ID to get the offer for.
+   * @return The offer for the slave ID.
    */
-  void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
+  Optional<HostOffer> get(AgentID slaveId);
 
   /**
-   * Notifies the offer queue that a host's attributes have changed.
+   * Gets all offers that the scheduler is holding, excluding banned offers.
    *
-   * @param change State change notification.
+   * @return A snapshot of the offers that the scheduler is currently holding.
    */
-  void hostAttributesChanged(HostAttributesChanged change);
+  Iterable<HostOffer> getAll();
 
   /**
-   * Gets the offers that the scheduler is holding, excluding banned offers.
+   * Gets the offer for the given slave ID if satisfies the supplied {@link ResourceRequest}.
    *
-   * @return A snapshot of the offers that the scheduler is currently holding.
+   * @param slaveId Slave ID to get the offer for.
+   * @param resourceRequest The request that the offer should satisfy.
+   * @param revocable Whether or not the request can use revocable resources.
+   * @return An option containing the offer for the slave ID if it fits.
    */
-  Iterable<HostOffer> getOffers();
+  Optional<HostOffer> getMatching(AgentID slaveId,
+                                  ResourceRequest resourceRequest,
+                                  boolean revocable);
 
   /**
-   * Gets all offers that are not banned for the given {@code groupKey}.
+   * Gets all offers that the scheduler is holding that satisfy the supplied
+   * {@link ResourceRequest}.
    *
-   * @param groupKey Task group key to check offers for.
-   * @return A snapshot of all offers eligible for the given {@code groupKey}.
+   * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}.
+   * @param resourceRequest The request that the offer should satisfy.
+   * @param revocable Whether or not the request can use revocable resources.
+   * @return An option containing the offer for the slave ID if it fits.
    */
-  Iterable<HostOffer> getOffers(TaskGroupKey groupKey);
+  Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+                                     ResourceRequest resourceRequest,
+                                     boolean revocable);
 
   /**
-   * Gets an offer for the given slave ID.
+   * Launches the task matched against the offer.
    *
-   * @param slaveId Slave ID to get offer for.
-   * @return An offer for the slave ID.
+   * @param offerId Matched offer ID.
+   * @param task Matched task info.
+   * @throws LaunchException If there was an error launching the task.
    */
-  Optional<HostOffer> getOffer(AgentID slaveId);
+  void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
 
   /**
    * Thrown when there was an unexpected failure trying to launch a task.
@@ -140,319 +123,4 @@ public interface OfferManager extends EventSubscriber {
       super(msg, cause);
     }
   }
-
-  class OfferManagerImpl implements OfferManager {
-    @VisibleForTesting
-    static final Logger LOG = LoggerFactory.getLogger(OfferManagerImpl.class);
-    @VisibleForTesting
-    static final String OFFER_ACCEPT_RACES = "offer_accept_races";
-    @VisibleForTesting
-    static final String OUTSTANDING_OFFERS = "outstanding_offers";
-    @VisibleForTesting
-    static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size";
-    @VisibleForTesting
-    static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate";
-    @VisibleForTesting
-    static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
-    @VisibleForTesting
-    static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size";
-
-    private final HostOffers hostOffers;
-    private final AtomicLong offerRaces;
-    private final AtomicLong offerCancelFailures;
-
-    private final Driver driver;
-    private final OfferSettings offerSettings;
-    private final Deferment offerDecline;
-
-    @Inject
-    @VisibleForTesting
-    public OfferManagerImpl(
-        Driver driver,
-        OfferSettings offerSettings,
-        StatsProvider statsProvider,
-        Deferment offerDecline) {
-
-      this.driver = requireNonNull(driver);
-      this.offerSettings = requireNonNull(offerSettings);
-      this.hostOffers = new HostOffers(statsProvider, offerSettings);
-      this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
-      this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
-      this.offerDecline = requireNonNull(offerDecline);
-    }
-
-    @Override
-    public void addOffer(HostOffer offer) {
-      Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
-      if (sameAgent.isPresent()) {
-        // We have an existing offer for the same agent.  We choose to return both offers so that
-        // they may be combined into a single offer.
-        LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
-            + " for compaction.");
-        decline(offer.getOffer().getId());
-        decline(sameAgent.get().getOffer().getId());
-      } else {
-        offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
-      }
-    }
-
-    private void removeAndDecline(OfferID id) {
-      if (removeFromHostOffers(id)) {
-        decline(id);
-      }
-    }
-
-    private void decline(OfferID id) {
-      LOG.debug("Declining offer {}", id);
-      driver.declineOffer(id, getOfferFilter());
-    }
-
-    private Protos.Filters getOfferFilter() {
-      return Protos.Filters.newBuilder()
-          .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS))
-          .build();
-    }
-
-    @Override
-    public boolean cancelOffer(final OfferID offerId) {
-      boolean success = removeFromHostOffers(offerId);
-      if (!success) {
-        // This will happen rarely when we race to process this rescind against accepting the offer
-        // to launch a task.
-        // If it happens frequently, we are likely processing rescinds before the offer itself.
-        LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
-        this.offerCancelFailures.incrementAndGet();
-      }
-      return success;
-    }
-
-    @Override
-    public void banOffer(OfferID offerId) {
-      hostOffers.addGlobalBan(offerId);
-    }
-
-    private boolean removeFromHostOffers(final OfferID offerId) {
-      requireNonNull(offerId);
-
-      // The small risk of inconsistency is acceptable here - if we have an accept/remove race
-      // on an offer, the master will mark the task as LOST and it will be retried.
-      return hostOffers.remove(offerId);
-    }
-
-    @Override
-    public Iterable<HostOffer> getOffers() {
-      return hostOffers.getOffers();
-    }
-
-    @Override
-    public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
-      return hostOffers.getWeaklyConsistentOffers(groupKey);
-    }
-
-    @Override
-    public Optional<HostOffer> getOffer(AgentID slaveId) {
-      return hostOffers.get(slaveId);
-    }
-
-    /**
-     * Updates the preference of a host's offers.
-     *
-     * @param change Host change notification.
-     */
-    @Subscribe
-    public void hostAttributesChanged(HostAttributesChanged change) {
-      hostOffers.updateHostAttributes(change.getAttributes());
-    }
-
-    /**
-     * Notifies the queue that the driver is disconnected, and all the stored offers are now
-     * invalid.
-     * <p>
-     * The queue takes this as a signal to flush its queue.
-     *
-     * @param event Disconnected event.
-     */
-    @Subscribe
-    public void driverDisconnected(DriverDisconnected event) {
-      LOG.info("Clearing stale offers since the driver is disconnected.");
-      hostOffers.clear();
-    }
-
-    /**
-     * Used for testing to ensure that the underlying cache's `size` method returns an accurate
-     * value by not including evicted entries.
-     */
-    @VisibleForTesting
-    public void cleanupStaticBans() {
-      hostOffers.staticallyBannedOffers.cleanUp();
-    }
-
-    /**
-     * A container for the data structures used by this class, to make it easier to reason about
-     * the different indices used and their consistency.
-     */
-    private static class HostOffers {
-
-      private final Set<HostOffer> offers;
-      private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
-      private final Map<AgentID, HostOffer> offersBySlave = Maps.newHashMap();
-      private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
-
-      // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
-      // scheduling attempts. See VetoGroup for more details on static ban.
-      private final Cache<Pair<OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers;
-
-      // Keep track of globally banned offers that will never be matched to anything.
-      private final Set<OfferID> globallyBannedOffers = Sets.newConcurrentHashSet();
-
-      HostOffers(StatsProvider statsProvider, OfferSettings offerSettings) {
-        offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering());
-        staticallyBannedOffers = offerSettings
-            .getStaticBanCacheBuilder()
-            .build();
-        // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
-        // Could track this separately if it turns out to pose problems.
-        statsProvider.exportSize(OUTSTANDING_OFFERS, offers);
-        statsProvider.makeGauge(STATICALLY_BANNED_OFFERS, staticallyBannedOffers::size);
-        statsProvider.makeGauge(STATICALLY_BANNED_OFFERS_HIT_RATE,
-            () -> staticallyBannedOffers.stats().hitRate());
-        statsProvider.makeGauge(GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size);
-      }
-
-      synchronized Optional<HostOffer> get(AgentID slaveId) {
-        HostOffer offer = offersBySlave.get(slaveId);
-        if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) {
-          return Optional.absent();
-        }
-
-        return Optional.of(offer);
-      }
-
-      /**
-       * Adds an offer while maintaining a guarantee that no two offers may exist with the same
-       * agent ID.  If an offer exists with the same agent ID, the existing offer is removed
-       * and returned, and {@code offer} is not added.
-       *
-       * @param offer Offer to add.
-       * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
-       *         which will also be removed prior to returning.
-       */
-      synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
-        HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
-        if (sameAgent != null) {
-          remove(sameAgent.getOffer().getId());
-          return Optional.of(sameAgent);
-        }
-
-        addInternal(offer);
-        return Optional.absent();
-      }
-
-      private void addInternal(HostOffer offer) {
-        offers.add(offer);
-        offersById.put(offer.getOffer().getId(), offer);
-        offersBySlave.put(offer.getOffer().getAgentId(), offer);
-        offersByHost.put(offer.getOffer().getHostname(), offer);
-      }
-
-      synchronized boolean remove(OfferID id) {
-        HostOffer removed = offersById.remove(id);
-        if (removed != null) {
-          offers.remove(removed);
-          offersBySlave.remove(removed.getOffer().getAgentId());
-          offersByHost.remove(removed.getOffer().getHostname());
-        }
-        globallyBannedOffers.remove(id);
-        return removed != null;
-      }
-
-      synchronized void updateHostAttributes(IHostAttributes attributes) {
-        HostOffer offer = offersByHost.remove(attributes.getHost());
-        if (offer != null) {
-          // Remove and re-add a host's offer to re-sort based on its new hostStatus
-          remove(offer.getOffer().getId());
-          addInternal(new HostOffer(offer.getOffer(), attributes));
-        }
-      }
-
-      /**
-       * Returns an iterable giving the state of the offers at the time the method is called. Unlike
-       * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and
-       * will not be modified outside of the returned iterable.
-       *
-       * @return The offers currently known by the scheduler.
-       */
-      synchronized Iterable<HostOffer> getOffers() {
-        return FluentIterable.from(offers).filter(
-            e -> !globallyBannedOffers.contains(e.getOffer().getId())
-        ).toSet();
-      }
-
-      /**
-       * Returns a weakly-consistent iterable giving the available offers to a given
-       * {@code groupKey}. This iterable can handle concurrent operations on its underlying
-       * collection, and may reflect changes that happen after the construction of the iterable.
-       * This property is mainly used in {@code launchTask}.
-       *
-       * @param groupKey The task group to get offers for.
-       * @return The offers a given task group can use.
-       */
-      synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) {
-        return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(e ->
-            staticallyBannedOffers.getIfPresent(Pair.of(e.getOffer().getId(), groupKey)) == null
-                && !globallyBannedOffers.contains(e.getOffer().getId())));
-      }
-
-      synchronized void addGlobalBan(OfferID offerId) {
-        globallyBannedOffers.add(offerId);
-      }
-
-      synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) {
-        if (offersById.containsKey(offerId)) {
-          staticallyBannedOffers.put(Pair.of(offerId, groupKey), true);
-        }
-      }
-
-      synchronized void clear() {
-        offers.clear();
-        offersById.clear();
-        offersBySlave.clear();
-        offersByHost.clear();
-        staticallyBannedOffers.invalidateAll();
-        globallyBannedOffers.clear();
-      }
-    }
-
-    @Override
-    public void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey) {
-      hostOffers.addStaticGroupBan(offerId, groupKey);
-    }
-
-    @Timed("offer_manager_launch_task")
-    @Override
-    public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException {
-      // Guard against an offer being removed after we grabbed it from the iterator.
-      // If that happens, the offer will not exist in hostOffers, and we can immediately
-      // send it back to LOST for quick reschedule.
-      // Removing while iterating counts on the use of a weakly-consistent iterator being used,
-      // which is a feature of ConcurrentSkipListSet.
-      if (hostOffers.remove(offerId)) {
-        try {
-          Operation launch = Operation.newBuilder()
-              .setType(Operation.Type.LAUNCH)
-              .setLaunch(Operation.Launch.newBuilder().addTaskInfos(task))
-              .build();
-          driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter());
-        } catch (IllegalStateException e) {
-          // TODO(William Farner): Catch only the checked exception produced by Driver
-          // once it changes from throwing IllegalStateException when the driver is not yet
-          // registered.
-          throw new LaunchException("Failed to launch task.", e);
-        }
-      } else {
-        offerRaces.incrementAndGet();
-        throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
new file mode 100644
index 0000000..427b1b4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.mesos.v1.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
+
+public class OfferManagerImpl implements OfferManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(org.apache.aurora.scheduler.offers.OfferManagerImpl.class);
+
+  @VisibleForTesting
+  static final String OFFER_ACCEPT_RACES = "offer_accept_races";
+  @VisibleForTesting
+  static final String OUTSTANDING_OFFERS = "outstanding_offers";
+  @VisibleForTesting
+  static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size";
+  @VisibleForTesting
+  static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate";
+  @VisibleForTesting
+  static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
+  @VisibleForTesting
+  static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size";
+  @VisibleForTesting
+  static final String VETO_EVALUATED_OFFERS = "veto_evaluated_offers";
+
+  private final HostOffers hostOffers;
+  private final AtomicLong offerRaces;
+  private final AtomicLong offerCancelFailures;
+
+  private final Driver driver;
+  private final OfferSettings offerSettings;
+  private final Deferment offerDecline;
+
+  @Inject
+  @VisibleForTesting
+  public OfferManagerImpl(
+      Driver driver,
+      OfferSettings offerSettings,
+      StatsProvider statsProvider,
+      Deferment offerDecline,
+      SchedulingFilter schedulingFilter) {
+
+    this.driver = requireNonNull(driver);
+    this.offerSettings = requireNonNull(offerSettings);
+    this.hostOffers = new HostOffers(statsProvider, offerSettings, schedulingFilter);
+    this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
+    this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
+    this.offerDecline = requireNonNull(offerDecline);
+  }
+
+  @Override
+  public void add(HostOffer offer) {
+    Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
+    if (sameAgent.isPresent()) {
+      // We have an existing offer for the same agent.  We choose to return both offers so that
+      // they may be combined into a single offer.
+      LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
+          + " for compaction.");
+      decline(offer.getOffer().getId());
+      decline(sameAgent.get().getOffer().getId());
+    } else {
+      offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
+    }
+  }
+
+  private void removeAndDecline(Protos.OfferID id) {
+    if (removeFromHostOffers(id)) {
+      decline(id);
+    }
+  }
+
+  private void decline(Protos.OfferID id) {
+    LOG.debug("Declining offer {}", id);
+    driver.declineOffer(id, getOfferFilter());
+  }
+
+  private Protos.Filters getOfferFilter() {
+    return Protos.Filters.newBuilder()
+        .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS))
+        .build();
+  }
+
+  @Override
+  public boolean cancel(final Protos.OfferID offerId) {
+    boolean success = removeFromHostOffers(offerId);
+    if (!success) {
+      // This will happen rarely when we race to process this rescind against accepting the offer
+      // to launch a task.
+      // If it happens frequently, we are likely processing rescinds before the offer itself.
+      LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
+      this.offerCancelFailures.incrementAndGet();
+    }
+    return success;
+  }
+
+  private boolean removeFromHostOffers(final Protos.OfferID offerId) {
+    requireNonNull(offerId);
+
+    // The small risk of inconsistency is acceptable here - if we have an accept/remove race
+    // on an offer, the master will mark the task as LOST and it will be retried.
+    return hostOffers.remove(offerId);
+  }
+
+  @Override
+  public void ban(Protos.OfferID offerId) {
+    hostOffers.addGlobalBan(offerId);
+  }
+
+  /**
+   * Updates the preference of a host's offers.
+   *
+   * @param change Host change notification.
+   */
+  @Subscribe
+  public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
+    hostOffers.updateHostAttributes(change.getAttributes());
+  }
+
+  @Override
+  public Optional<HostOffer> get(Protos.AgentID slaveId) {
+    return hostOffers.get(slaveId);
+  }
+
+  @Override
+  public Iterable<HostOffer> getAll() {
+    return hostOffers.getOffers();
+  }
+
+  @Override
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+                                         ResourceRequest resourceRequest,
+                                         boolean revocable) {
+
+    return hostOffers.getMatching(slaveId, resourceRequest, revocable);
+  }
+
+  @Override
+  public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+                                            ResourceRequest resourceRequest,
+                                            boolean revocable) {
+
+    return hostOffers.getAllMatching(groupKey, resourceRequest, revocable);
+  }
+
+  /**
+   * Notifies the queue that the driver is disconnected, and all the stored offers are now
+   * invalid.
+   * <p>
+   * The queue takes this as a signal to flush its queue.
+   *
+   * @param event Disconnected event.
+   */
+  @Subscribe
+  public void driverDisconnected(PubsubEvent.DriverDisconnected event) {
+    LOG.info("Clearing stale offers since the driver is disconnected.");
+    hostOffers.clear();
+  }
+
+  @Timed("offer_manager_launch_task")
+  @Override
+  public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) throws LaunchException {
+    // Guard against an offer being removed after we grabbed it from the iterator.
+    // If that happens, the offer will not exist in hostOffers, and we can immediately
+    // send it back to LOST for quick reschedule.
+    // Removing while iterating counts on the use of a weakly-consistent iterator being used,
+    // which is a feature of ConcurrentSkipListSet.
+    if (hostOffers.remove(offerId)) {
+      try {
+        Protos.Offer.Operation launch = Protos.Offer.Operation.newBuilder()
+            .setType(Protos.Offer.Operation.Type.LAUNCH)
+            .setLaunch(Protos.Offer.Operation.Launch.newBuilder().addTaskInfos(task))
+            .build();
+        driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter());
+      } catch (IllegalStateException e) {
+        // TODO(William Farner): Catch only the checked exception produced by Driver
+        // once it changes from throwing IllegalStateException when the driver is not yet
+        // registered.
+        throw new LaunchException("Failed to launch task.", e);
+      }
+    } else {
+      offerRaces.incrementAndGet();
+      throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
+    }
+  }
+
+  /**
+   * Get all static bans.
+   */
+  @VisibleForTesting
+  Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() {
+    return hostOffers.getStaticBans();
+  }
+
+  /**
+   * Exclude an offer that results in a static mismatch from further attempts to match against all
+   * tasks from the same group.
+   */
+  @VisibleForTesting
+  void banForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) {
+    hostOffers.addStaticGroupBan(offerId, groupKey);
+  }
+
+  /**
+   * Used for testing to ensure that the underlying cache's `size` method returns an accurate
+   * value by not including evicted entries.
+   */
+  @VisibleForTesting
+  void cleanupStaticBans() {
+    hostOffers.cleanUpStaticallyBannedOffers();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
new file mode 100644
index 0000000..e2e3628
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+
+import javax.inject.Qualifier;
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Supplier;
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Random;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.app.MoreModules;
+import org.apache.aurora.scheduler.config.CliOptions;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
+import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Binding module for resource offer management.
+ */
+public class OfferManagerModule extends AbstractModule {
+  private static final Logger LOG = LoggerFactory.getLogger(OfferManagerModule.class);
+
+  @Parameters(separators = "=")
+  public static class Options {
+    @Parameter(names = "-hold_offers_forever",
+        description =
+            "Hold resource offers indefinitely, disabling automatic offer decline settings.",
+        arity = 1)
+    public boolean holdOffersForever = false;
+
+    @Parameter(names = "-min_offer_hold_time",
+        validateValueWith = NotNegativeAmount.class,
+        description = "Minimum amount of time to hold a resource offer before declining.")
+    public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
+
+    @Parameter(names = "-offer_hold_jitter_window",
+        validateValueWith = NotNegativeAmount.class,
+        description = "Maximum amount of random jitter to add to the offer hold time window.")
+    public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
+
+    @Parameter(names = "-offer_filter_duration",
+        description =
+            "Duration after which we expect Mesos to re-offer unused resources. A short duration "
+                + "improves scheduling performance in smaller clusters, but might lead to resource "
+                + "starvation for other frameworks if you run many frameworks in your cluster.")
+    public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
+
+    @Parameter(names = "-unavailability_threshold",
+        description =
+            "Threshold time, when running tasks should be drained from a host, before a host "
+                + "becomes unavailable. Should be greater than min_offer_hold_time + "
+                + "offer_hold_jitter_window.")
+    public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
+
+    @Parameter(names = "-offer_order",
+        description =
+            "Iteration order for offers, to influence task scheduling. Multiple orderings will be "
+                + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
+                + " then memory and finally would randomize any equal offers.")
+    public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
+
+    @Parameter(names = "-offer_order_modules",
+        description = "Custom Guice module to provide an offer ordering.")
+    @SuppressWarnings("rawtypes")
+    public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class);
+
+    @Parameter(names = "-offer_static_ban_cache_max_size",
+        validateValueWith = NotNegativeNumber.class,
+        description =
+            "The number of offers to hold in the static ban cache. If no value is specified, "
+                + "the cache will grow indefinitely. However, entries will expire within "
+                + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
+    public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
+  }
+
+  /**
+   * Binding annotation for the threshold to veto tasks with unavailability.
+   */
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface UnavailabilityThreshold { }
+
+  public static class OfferOrderModule extends AbstractModule {
+    private final CliOptions options;
+
+    public OfferOrderModule(CliOptions options) {
+      this.options = options;
+    }
+
+    @Override
+    protected void configure() {
+      bind(new TypeLiteral<Ordering<HostOffer>>() { })
+          .toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
+    }
+  }
+
+  private final CliOptions cliOptions;
+
+  public OfferManagerModule(CliOptions cliOptions) {
+    this.cliOptions = cliOptions;
+  }
+
+  @Override
+  protected void configure() {
+    Options options = cliOptions.offer;
+    if (!options.holdOffersForever) {
+      long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
+          + options.minOfferHoldTime.as(Time.SECONDS);
+      if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
+        LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
+                + " and offer_hold_jitter_window ({}). This creates risks of races between "
+                + "launching and draining",
+            options.unavailabilityThreshold,
+            options.minOfferHoldTime,
+            options.offerHoldJitterWindow);
+      }
+    }
+
+    for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
+      install(module);
+    }
+
+    bind(new TypeLiteral<Amount<Long, Time>>() { })
+        .annotatedWith(UnavailabilityThreshold.class)
+        .toInstance(options.unavailabilityThreshold);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        if (options.holdOffersForever) {
+          bind(Deferment.class).to(Deferment.Noop.class);
+        } else {
+          bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
+              new RandomJitterReturnDelay(
+                  options.minOfferHoldTime.as(Time.MILLISECONDS),
+                  options.offerHoldJitterWindow.as(Time.MILLISECONDS),
+                  Random.Util.newDefaultRandom()));
+          bind(Deferment.class).to(Deferment.DelayedDeferment.class);
+        }
+
+        bind(OfferManager.class).to(OfferManagerImpl.class);
+        bind(OfferManagerImpl.class).in(Singleton.class);
+        expose(OfferManager.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
+  }
+
+  @Provides
+  @Singleton
+  OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
+    // We have a dual eviction strategy for the static ban cache in OfferManager that is based on
+    // both maximum size of the cache and the length an offer is valid. We do this in order to
+    // satisfy requirements in both single- and multi-framework environments. If offers are held for
+    // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
+    // longest it will be valid for. Additionally, cluster operators will most likely not have to
+    // worry about cache size in this case as this behavior mimics current behavior. If offers are
+    // held indefinitely, then we never expire cache entries but the cluster operator can specify a
+    // maximum size to avoid a memory leak.
+    long maxOfferHoldTime;
+    if (cliOptions.offer.holdOffersForever) {
+      maxOfferHoldTime = Long.MAX_VALUE;
+    } else {
+      maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
+          + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
+    }
+
+    return new OfferSettings(
+        cliOptions.offer.offerFilterDuration,
+        offerOrdering,
+        Amount.of(maxOfferHoldTime, Time.SECONDS),
+        cliOptions.offer.offerStaticBanCacheMaxSize,
+        Ticker.systemTicker());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
index 57fc1a1..838a319 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
@@ -29,6 +29,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * Settings required to create an OfferManager.
  */
+@VisibleForTesting
 public class OfferSettings {
 
   private final Amount<Long, Time> filterDuration;
@@ -67,14 +68,14 @@ public class OfferSettings {
   /**
    * Duration after which we want Mesos to re-offer unused or declined resources.
    */
-  public Amount<Long, Time> getFilterDuration() {
+  Amount<Long, Time> getFilterDuration() {
     return filterDuration;
   }
 
   /**
    * The ordering to use when fetching offers from OfferManager.
    */
-  public Ordering<HostOffer> getOrdering() {
+  Ordering<HostOffer> getOrdering() {
     return ordering;
   }
 
@@ -82,7 +83,7 @@ public class OfferSettings {
    * The builder for the static ban cache. Cache settings (e.g. max size, entry expiration) should
    * already be added to the builder by this point.
    */
-  public CacheBuilder<Object, Object> getStaticBanCacheBuilder() {
+  CacheBuilder<Object, Object> getStaticBanCacheBuilder() {
     return staticBanCacheBuilder;
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
deleted file mode 100644
index 4a6ea8d..0000000
--- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.offers;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.List;
-
-import javax.inject.Qualifier;
-import javax.inject.Singleton;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.base.Supplier;
-import com.google.common.base.Ticker;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Ordering;
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.util.Random;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.app.MoreModules;
-import org.apache.aurora.scheduler.config.CliOptions;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
-import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Binding module for resource offer management.
- */
-public class OffersModule extends AbstractModule {
-  private static final Logger LOG = LoggerFactory.getLogger(OffersModule.class);
-
-  @Parameters(separators = "=")
-  public static class Options {
-    @Parameter(names = "-hold_offers_forever",
-        description =
-            "Hold resource offers indefinitely, disabling automatic offer decline settings.",
-        arity = 1)
-    public boolean holdOffersForever = false;
-
-    @Parameter(names = "-min_offer_hold_time",
-        validateValueWith = NotNegativeAmount.class,
-        description = "Minimum amount of time to hold a resource offer before declining.")
-    public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
-
-    @Parameter(names = "-offer_hold_jitter_window",
-        validateValueWith = NotNegativeAmount.class,
-        description = "Maximum amount of random jitter to add to the offer hold time window.")
-    public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
-
-    @Parameter(names = "-offer_filter_duration",
-        description =
-            "Duration after which we expect Mesos to re-offer unused resources. A short duration "
-                + "improves scheduling performance in smaller clusters, but might lead to resource "
-                + "starvation for other frameworks if you run many frameworks in your cluster.")
-    public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
-
-    @Parameter(names = "-unavailability_threshold",
-        description =
-            "Threshold time, when running tasks should be drained from a host, before a host "
-                + "becomes unavailable. Should be greater than min_offer_hold_time + "
-                + "offer_hold_jitter_window.")
-    public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
-
-    @Parameter(names = "-offer_order",
-        description =
-            "Iteration order for offers, to influence task scheduling. Multiple orderings will be "
-                + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
-                + " then memory and finally would randomize any equal offers.")
-    public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
-
-    @Parameter(names = "-offer_order_modules",
-        description = "Custom Guice module to provide an offer ordering.")
-    @SuppressWarnings("rawtypes")
-    public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class);
-
-    @Parameter(names = "-offer_static_ban_cache_max_size",
-        validateValueWith = NotNegativeNumber.class,
-        description =
-            "The number of offers to hold in the static ban cache. If no value is specified, "
-                + "the cache will grow indefinitely. However, entries will expire within "
-                + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
-    public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
-  }
-
-  public static class OfferOrderModule extends AbstractModule {
-    private final CliOptions options;
-
-    public OfferOrderModule(CliOptions options) {
-      this.options = options;
-    }
-
-    @Override
-    protected void configure() {
-      bind(new TypeLiteral<Ordering<HostOffer>>() { })
-          .toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
-    }
-  }
-
-  /**
-   * Binding annotation for the threshold to veto tasks with unavailability.
-   */
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface UnavailabilityThreshold { }
-
-  private final CliOptions cliOptions;
-
-  public OffersModule(CliOptions cliOptions) {
-    this.cliOptions = cliOptions;
-  }
-
-  @Override
-  protected void configure() {
-    Options options = cliOptions.offer;
-    if (!options.holdOffersForever) {
-      long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
-          + options.minOfferHoldTime.as(Time.SECONDS);
-      if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
-        LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
-                + " and offer_hold_jitter_window ({}). This creates risks of races between "
-                + "launching and draining",
-            options.unavailabilityThreshold,
-            options.minOfferHoldTime,
-            options.offerHoldJitterWindow);
-      }
-    }
-
-    for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
-      install(module);
-    }
-
-    bind(new TypeLiteral<Amount<Long, Time>>() { })
-        .annotatedWith(UnavailabilityThreshold.class)
-        .toInstance(options.unavailabilityThreshold);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        if (options.holdOffersForever) {
-          bind(Deferment.class).to(Deferment.Noop.class);
-        } else {
-          bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
-              new RandomJitterReturnDelay(
-                  options.minOfferHoldTime.as(Time.MILLISECONDS),
-                  options.offerHoldJitterWindow.as(Time.MILLISECONDS),
-                  Random.Util.newDefaultRandom()));
-          bind(Deferment.class).to(Deferment.DelayedDeferment.class);
-        }
-
-        bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
-        bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
-        expose(OfferManager.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
-  }
-
-  @Provides
-  @Singleton
-  OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
-    // We have a dual eviction strategy for the static ban cache in OfferManager that is based on
-    // both maximum size of the cache and the length an offer is valid. We do this in order to
-    // satisfy requirements in both single- and multi-framework environments. If offers are held for
-    // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
-    // longest it will be valid for. Additionally, cluster operators will most likely not have to
-    // worry about cache size in this case as this behavior mimics current behavior. If offers are
-    // held indefinitely, then we never expire cache entries but the cluster operator can specify a
-    // maximum size to avoid a memory leak.
-    long maxOfferHoldTime;
-    if (cliOptions.offer.holdOffersForever) {
-      maxOfferHoldTime = Long.MAX_VALUE;
-    } else {
-      maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
-          + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
-    }
-
-    return new OfferSettings(
-        cliOptions.offer.offerFilterDuration,
-        offerOrdering,
-        Amount.of(maxOfferHoldTime, Time.SECONDS),
-        cliOptions.offer.offerStaticBanCacheMaxSize,
-        Ticker.systemTicker());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
index 766d3b2..497a766 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
@@ -143,7 +143,7 @@ public class PendingTaskProcessor implements Runnable {
 
       // Group the offers by slave id so they can be paired with active tasks from the same slave.
       Map<String, HostOffer> slavesToOffers =
-          Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
+          Maps.uniqueIndex(offerManager.getAll(), OFFER_TO_SLAVE_ID);
 
       Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
           slavesToOffers.keySet(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
index 82a0ff6..ffb8b90 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
@@ -94,7 +94,7 @@ public interface Preemptor {
                 pendingTask.getTask(),
                 slot.getVictims(),
                 jobState,
-                offerManager.getOffer(slaveId),
+                offerManager.get(slaveId),
                 store);
 
         metrics.recordSlotValidationResult(validatedVictims);