You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/11/28 19:02:37 UTC

[3/3] aurora git commit: Enable custom offer scoring modules for task assignment

Enable custom offer scoring modules for task assignment

Major portions of the refactor:

* Refactor `OfferManager` to do filtering of offers (added `getMatching` and
  `getAllMatching` methods) as opposed to TaskAssigner
* Refactor `TaskAssigner`, allow for injection of custom "scoring" class
  through `OfferRanker` interface

And some minor things as well:

* Moved `TaskAssignerImpl`, `TaskSchedulerImpl`, and `HostOffers` into their own
  upper-level classes
* Moved `TaskAssigner` to the `scheduling` package and out of the `state` package
* Renamed some methods in `OfferManager` to avoid code stutter
* Renaming of some classes (e.g. `FirstFitTaskAssigner` -> `TaskAssignerImpl`)
* And a slew of others

Reviewed at https://reviews.apache.org/r/63973/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/80139da4
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/80139da4
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/80139da4

Branch: refs/heads/master
Commit: 80139da4624916e406c7e80c4ea2d286d4d859c3
Parents: 21af250
Author: Jordan Ly <jo...@gmail.com>
Authored: Tue Nov 28 11:02:14 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Nov 28 11:02:14 2017 -0800

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   3 +
 docs/reference/scheduler-configuration.md       |   2 +-
 .../org/apache/aurora/benchmark/Offers.java     |   2 +-
 .../aurora/benchmark/SchedulingBenchmarks.java  |  16 +-
 .../benchmark/fakes/FakeOfferManager.java       |  31 +-
 .../apache/aurora/scheduler/app/AppModule.java  |   4 +-
 .../aurora/scheduler/config/CliOptions.java     |   6 +-
 .../scheduler/filter/AttributeAggregate.java    |  38 +-
 .../scheduler/filter/SchedulingFilter.java      |   5 +
 .../scheduler/filter/SchedulingFilterImpl.java  |   2 +-
 .../apache/aurora/scheduler/http/Offers.java    |   2 +-
 .../scheduler/mesos/MesosCallbackHandler.java   |  12 +-
 .../aurora/scheduler/offers/HostOffers.java     | 253 +++++++++
 .../aurora/scheduler/offers/OfferManager.java   | 404 ++------------
 .../scheduler/offers/OfferManagerImpl.java      | 246 +++++++++
 .../scheduler/offers/OfferManagerModule.java    | 211 ++++++++
 .../aurora/scheduler/offers/OfferSettings.java  |   7 +-
 .../aurora/scheduler/offers/OffersModule.java   | 211 --------
 .../preemptor/PendingTaskProcessor.java         |   2 +-
 .../aurora/scheduler/preemptor/Preemptor.java   |   2 +-
 .../scheduling/FirstFitOfferSelector.java       |  29 +
 .../scheduling/FirstFitOfferSelectorModule.java |  26 +
 .../scheduler/scheduling/OfferSelector.java     |  36 ++
 .../scheduler/scheduling/SchedulingModule.java  |   4 +-
 .../scheduler/scheduling/TaskAssigner.java      |  46 ++
 .../scheduler/scheduling/TaskAssignerImpl.java  | 284 ++++++++++
 .../scheduling/TaskAssignerImplModule.java      |  59 ++
 .../scheduler/scheduling/TaskScheduler.java     | 191 -------
 .../scheduler/scheduling/TaskSchedulerImpl.java | 207 +++++++
 .../state/FirstFitTaskAssignerModule.java       |  31 --
 .../aurora/scheduler/state/StateModule.java     |   3 +-
 .../aurora/scheduler/state/TaskAssigner.java    | 338 ------------
 .../scheduler/stats/AsyncStatsModule.java       |   2 +-
 .../scheduler/config/CommandLineTest.java       |   4 +-
 .../aurora/scheduler/http/OffersTest.java       |   4 +-
 .../mesos/MesosCallbackHandlerTest.java         |  20 +-
 .../scheduler/offers/OfferManagerImplTest.java  | 381 +++++++++----
 .../preemptor/PendingTaskProcessorTest.java     |   2 +-
 .../scheduler/preemptor/PreemptorImplTest.java  |   2 +-
 .../preemptor/PreemptorModuleTest.java          |   2 +-
 .../scheduling/FirstFitOfferSelectorTest.java   |  66 +++
 .../scheduling/TaskAssignerImplTest.java        | 374 +++++++++++++
 .../scheduling/TaskSchedulerImplTest.java       |   2 -
 .../state/FirstFitTaskAssignerTest.java         | 539 -------------------
 .../scheduler/stats/AsyncStatsModuleTest.java   |   2 +-
 45 files changed, 2244 insertions(+), 1869 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 2d3c423..54dcc75 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -12,6 +12,9 @@
   a production cluster. For that reason, the functionality is behind a new flag `-partition_aware`
   that is disabled by default. When Mesos support is improved and the new behavior is vetted in
   production clusters, we'll enable this by default.
+- Added the ability to "score" offers for a given scheduling assignment via the `OfferSelector`
+  interface. The default implementation is first fit, but cluster operators can inject a custom
+  scoring algorithm through the `-offer_selector_modules` flag.
 
 ### Deprecations and removals:
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 6c385b5..f697b6f 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -222,7 +222,7 @@ Optional flags:
 	Time for a stat to be retained in memory before expiring.
 -stat_sampling_interval (default (1, secs))
 	Statistic value sampling interval.
--task_assigner_modules (default [class org.apache.aurora.scheduler.state.FirstFitTaskAssignerModule])
+-task_assigner_modules (default [class org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule])
   Guice modules for replacing task assignment logic.
 -thermos_executor_cpu (default 0.25)
 	The number of CPU cores to allocate for each instance of the executor.

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/Offers.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
index 2b46326..2fcc804 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
@@ -50,7 +50,7 @@ final class Offers {
    */
   static void addOffers(OfferManager offerManager, Iterable<HostOffer> offers) {
     for (HostOffer offer : offers) {
-      offerManager.addOffer(offer);
+      offerManager.add(offer);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 1708a50..58e3224 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -62,15 +62,17 @@ import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
 import org.apache.aurora.scheduler.offers.Deferment;
 import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManagerImpl;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.offers.OfferOrder;
 import org.apache.aurora.scheduler.offers.OfferSettings;
-import org.apache.aurora.scheduler.offers.OffersModule;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.ClusterStateImpl;
 import org.apache.aurora.scheduler.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.scheduling.TaskScheduler;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
+import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.ReservationDuration;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -147,8 +149,8 @@ public class SchedulingBenchmarks {
               bind(ScheduledExecutorService.class).annotatedWith(AsyncModule.AsyncExecutor.class)
                   .toInstance(new NoopExecutor());
               bind(Deferment.class).to(Deferment.Noop.class);
-              bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
-              bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
+              bind(OfferManager.class).to(OfferManagerImpl.class);
+              bind(OfferManagerImpl.class).in(Singleton.class);
               bind(OfferSettings.class).toInstance(
                   new OfferSettings(NO_DELAY,
                       ImmutableList.of(OfferOrder.RANDOM),
@@ -157,8 +159,8 @@ public class SchedulingBenchmarks {
                       new FakeTicker()));
               bind(BiCache.BiCacheSettings.class).toInstance(
                   new BiCache.BiCacheSettings(DELAY_FOREVER, ""));
-              bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
-              bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+              bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+              bind(TaskSchedulerImpl.class).in(Singleton.class);
               expose(TaskScheduler.class);
               expose(OfferManager.class);
             }
@@ -171,7 +173,7 @@ public class SchedulingBenchmarks {
                   .toInstance(DELAY_FOREVER);
               bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class);
               bind(new TypeLiteral<Amount<Long, Time>>() { })
-                  .annotatedWith(OffersModule.UnavailabilityThreshold.class)
+                  .annotatedWith(OfferManagerModule.UnavailabilityThreshold.class)
                   .toInstance(Amount.of(1L, Time.MINUTES));
               bind(UpdateAgentReserver.class).to(UpdateAgentReserver.NullAgentReserver.class);
               bind(UpdateAgentReserver.NullAgentReserver.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index 201aa81..05c58ab 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -18,22 +18,23 @@ import com.google.common.base.Optional;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.mesos.v1.Protos;
 
 public class FakeOfferManager implements OfferManager {
   @Override
-  public void addOffer(HostOffer offer) {
+  public void add(HostOffer offer) {
     // no-op
   }
 
   @Override
-  public boolean cancelOffer(Protos.OfferID offerId) {
+  public boolean cancel(Protos.OfferID offerId) {
     return false;
   }
 
   @Override
-  public void banOffer(Protos.OfferID offerId) {
+  public void ban(Protos.OfferID offerId) {
     // no-op
   }
 
@@ -43,27 +44,33 @@ public class FakeOfferManager implements OfferManager {
   }
 
   @Override
-  public void banOfferForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) {
+  public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
     // no-op
   }
 
   @Override
-  public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
-    return null;
+  public Optional<HostOffer> get(Protos.AgentID agentId) {
+    return Optional.absent();
   }
 
   @Override
-  public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
-    // no-op
+  public Iterable<HostOffer> getAll() {
+    return null;
   }
 
   @Override
-  public Iterable<HostOffer> getOffers() {
-    return null;
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+                                         ResourceRequest resourceRequest,
+                                         boolean revocable) {
+
+    return Optional.absent();
   }
 
   @Override
-  public Optional<HostOffer> getOffer(Protos.AgentID agentId) {
-    return Optional.absent();
+  public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+                                            ResourceRequest resourceRequest,
+                                            boolean revocable) {
+
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 3204cca..817a019 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -47,7 +47,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.http.JettyServerModule;
 import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
 import org.apache.aurora.scheduler.metadata.MetadataModule;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.pruning.PruningModule;
 import org.apache.aurora.scheduler.quota.QuotaModule;
@@ -172,7 +172,7 @@ public class AppModule extends AbstractModule {
 
     install(new PubsubEventModule());
     install(new AsyncModule(options.async));
-    install(new OffersModule(options));
+    install(new OfferManagerModule(options));
     install(new PruningModule(options.pruning));
     install(new ReconciliationModule(options.reconciliation));
     install(new SchedulingModule(options.scheduling));

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
index d4537e3..b7f43e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
+++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
@@ -36,12 +36,13 @@ import org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule;
 import org.apache.aurora.scheduler.http.api.security.Kerberos5ShiroRealmModule;
 import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
 import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.pruning.PruningModule;
 import org.apache.aurora.scheduler.reconciliation.ReconciliationModule;
 import org.apache.aurora.scheduler.resources.ResourceSettings;
 import org.apache.aurora.scheduler.scheduling.SchedulingModule;
+import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule;
 import org.apache.aurora.scheduler.sla.SlaModule;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.stats.AsyncStatsModule;
@@ -54,7 +55,7 @@ import org.apache.aurora.scheduler.updater.UpdaterModule;
 public class CliOptions {
   public final ReconciliationModule.Options reconciliation =
       new ReconciliationModule.Options();
-  public final OffersModule.Options offer = new OffersModule.Options();
+  public final OfferManagerModule.Options offer = new OfferManagerModule.Options();
   public final ExecutorModule.Options executor = new ExecutorModule.Options();
   public final AppModule.Options app = new AppModule.Options();
   public final SchedulerMain.Options main = new SchedulerMain.Options();
@@ -84,6 +85,7 @@ public class CliOptions {
   public final StatsModule.Options stats = new StatsModule.Options();
   public final CronModule.Options cron = new CronModule.Options();
   public final ResourceSettings resourceSettings = new ResourceSettings();
+  public final TaskAssignerImplModule.Options taskAssigner = new TaskAssignerImplModule.Options();
   final List<Object> custom;
 
   public CliOptions() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index 60f141d..a5acafa 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -49,19 +49,8 @@ public final class AttributeAggregate {
    */
   private Supplier<Multiset<Pair<String, String>>> aggregate;
 
-  private boolean isInitialized = false;
-
   private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) {
-    this.aggregate = Suppliers.memoize(
-        () -> {
-          initialize();
-          return aggregate.get();
-        }
-    );
-  }
-
-  private void initialize() {
-    isInitialized = true; // inlining this assignment yields a PMD false positive
+    this.aggregate = Suppliers.memoize(aggregate);
   }
 
   /**
@@ -123,21 +112,16 @@ public final class AttributeAggregate {
   }
 
   public void updateAttributeAggregate(IHostAttributes attributes) {
-    // If the aggregate supplier has not been populated there is no need to update it here.
-    // All tasks attributes will be picked up by the wrapped task query if executed at a
-    // later point in time.
-    if (isInitialized) {
-      final Supplier<Multiset<Pair<String, String>>> previous = aggregate;
-      aggregate = Suppliers.memoize(
-          () -> {
-            ImmutableMultiset.Builder<Pair<String, String>> builder
-                = new ImmutableMultiset.Builder<>();
-            builder.addAll(previous.get());
-            addAttributes(builder, attributes.getAttributes());
-            return builder.build();
-          }
-      );
-    }
+    final Supplier<Multiset<Pair<String, String>>> previous = aggregate;
+    aggregate = Suppliers.memoize(
+        () -> {
+          ImmutableMultiset.Builder<Pair<String, String>> builder
+              = new ImmutableMultiset.Builder<>();
+          builder.addAll(previous.get());
+          addAttributes(builder, attributes.getAttributes());
+          return builder.build();
+        }
+    );
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 36608a9..a00c095 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -258,6 +259,10 @@ public interface SchedulingFilter {
       this(offer, attributes, Optional.absent());
     }
 
+    public UnusedResource(HostOffer offer, boolean revocable) {
+      this(offer.getResourceBag(revocable), offer.getAttributes(), offer.getUnavailabilityStart());
+    }
+
     public UnusedResource(ResourceBag offer, IHostAttributes attributes, Optional<Instant> start) {
       this.offer = offer;
       this.attributes = attributes;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index df51d4c..41a0764 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -32,7 +32,7 @@ import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.offers.OffersModule.UnavailabilityThreshold;
+import org.apache.aurora.scheduler.offers.OfferManagerModule.UnavailabilityThreshold;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAttribute;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index f22ca6e..bb92cd0 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -59,7 +59,7 @@ public class Offers {
   public Response getOffers() throws JsonProcessingException {
     return Response.ok(
         mapper.writeValueAsString(
-            StreamSupport.stream(offerManager.getOffers().spliterator(), false)
+            StreamSupport.stream(offerManager.getAll().spliterator(), false)
                 .map(o -> o.getOffer())
                 .collect(Collectors.toList())))
         .build();

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index fd5874d..87e702f 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -40,7 +40,7 @@ import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -126,7 +126,7 @@ public interface MesosCallbackHandler {
         Driver driver,
         Clock clock,
         MaintenanceController controller,
-        @OffersModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold,
+        @OfferManagerModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold,
         @PubsubEventModule.RegisteredEvents EventSink registeredEventSink) {
 
       this(
@@ -226,7 +226,7 @@ public interface MesosCallbackHandler {
             storeProvider.getAttributeStore().saveHostAttributes(attributes);
             log.info("Received offer: {}", offer.getId().getValue());
             offersReceived.incrementAndGet();
-            offerManager.addOffer(new HostOffer(offer, attributes));
+            offerManager.add(new HostOffer(offer, attributes));
           }
         });
       });
@@ -244,15 +244,15 @@ public interface MesosCallbackHandler {
       //      In this scenario, we want to ensure that we do not use it/accept it when the executor
       //      finally processes the offer. We will temporarily ban it and add a command for the
       //      executor to unban it so future offers can be processed normally.
-      boolean offerCancelled = offerManager.cancelOffer(offerId);
+      boolean offerCancelled = offerManager.cancel(offerId);
       if (!offerCancelled) {
         log.info(
             "Received rescind before adding offer: {}, temporarily banning.",
             offerId.getValue());
-        offerManager.banOffer(offerId);
+        offerManager.ban(offerId);
         executor.execute(() -> {
           log.info("Cancelling and unbanning offer: {}.", offerId.getValue());
-          offerManager.cancelOffer(offerId);
+          offerManager.cancel(offerId);
         });
       }
       offersRescinded.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
new file mode 100644
index 0000000..8adbcb1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.v1.Protos;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A container for the data structures used by this {@link OfferManagerImpl}, to make it easier to
+ * reason about the different indices used and their consistency.
+ */
+class HostOffers {
+  private final Set<HostOffer> offers;
+
+  private final Map<Protos.OfferID, HostOffer> offersById = Maps.newHashMap();
+  private final Map<Protos.AgentID, HostOffer> offersBySlave = Maps.newHashMap();
+  private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
+
+  // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
+  // scheduling attempts. See VetoGroup for more details on static ban.
+  private final Cache<Pair<Protos.OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers;
+  private final SchedulingFilter schedulingFilter;
+
+  // Keep track of globally banned offers that will never be matched to anything.
+  private final Set<Protos.OfferID> globallyBannedOffers = Sets.newHashSet();
+
+  // Keep track of the number of offers evaluated for vetoes when getting matching offers
+  private final AtomicLong vetoEvaluatedOffers;
+
+  HostOffers(StatsProvider statsProvider,
+             OfferSettings offerSettings,
+             SchedulingFilter schedulingFilter) {
+    this.offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering());
+    this.staticallyBannedOffers = offerSettings
+        .getStaticBanCacheBuilder()
+        .build();
+    this.schedulingFilter = requireNonNull(schedulingFilter);
+
+    // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
+    // Could track this separately if it turns out to pose problems.
+    statsProvider.exportSize(OfferManagerImpl.OUTSTANDING_OFFERS, offers);
+    statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS,
+        staticallyBannedOffers::size);
+    statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS_HIT_RATE,
+        () -> staticallyBannedOffers.stats().hitRate());
+    statsProvider.makeGauge(OfferManagerImpl.GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size);
+
+    vetoEvaluatedOffers = statsProvider.makeCounter(OfferManagerImpl.VETO_EVALUATED_OFFERS);
+  }
+
+  /**
+   * Adds an offer while maintaining a guarantee that no two offers may exist with the same
+   * agent ID.  If an offer exists with the same agent ID, the existing offer is removed
+   * and returned, and {@code offer} is not added.
+   *
+   * @param offer Offer to add.
+   * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
+   *         which will also be removed prior to returning.
+   */
+  synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
+    HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
+    if (sameAgent != null) {
+      remove(sameAgent.getOffer().getId());
+      return Optional.of(sameAgent);
+    }
+
+    addInternal(offer);
+    return Optional.absent();
+  }
+
+  private void addInternal(HostOffer offer) {
+    offers.add(offer);
+    offersById.put(offer.getOffer().getId(), offer);
+    offersBySlave.put(offer.getOffer().getAgentId(), offer);
+    offersByHost.put(offer.getOffer().getHostname(), offer);
+  }
+
+  synchronized boolean remove(Protos.OfferID id) {
+    HostOffer removed = offersById.remove(id);
+    if (removed != null) {
+      offers.remove(removed);
+      offersBySlave.remove(removed.getOffer().getAgentId());
+      offersByHost.remove(removed.getOffer().getHostname());
+    }
+    globallyBannedOffers.remove(id);
+    return removed != null;
+  }
+
+  synchronized void addGlobalBan(Protos.OfferID offerId) {
+    globallyBannedOffers.add(offerId);
+  }
+
+  synchronized void updateHostAttributes(IHostAttributes attributes) {
+    HostOffer offer = offersByHost.remove(attributes.getHost());
+    if (offer != null) {
+      // Remove and re-add a host's offer to re-sort based on its new hostStatus
+      remove(offer.getOffer().getId());
+      addInternal(new HostOffer(offer.getOffer(), attributes));
+    }
+  }
+
+  synchronized Optional<HostOffer> get(Protos.AgentID slaveId) {
+    HostOffer offer = offersBySlave.get(slaveId);
+    if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) {
+      return Optional.absent();
+    }
+
+    return Optional.of(offer);
+  }
+
+  /**
+   * Returns an iterable giving the state of the offers at the time the method is called. Unlike
+   * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and
+   * will not be modified outside of the returned iterable.
+   *
+   * @return The offers currently known by the scheduler.
+   */
+  synchronized Iterable<HostOffer> getOffers() {
+    return FluentIterable.from(offers)
+        .filter(o -> !globallyBannedOffers.contains(o.getOffer().getId()))
+        .toSet();
+  }
+
+  synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+                                               ResourceRequest resourceRequest,
+                                               boolean revocable) {
+
+    Optional<HostOffer> optionalOffer = get(slaveId);
+    if (optionalOffer.isPresent()) {
+      HostOffer offer = optionalOffer.get();
+
+      if (isGloballyBanned(offer)
+          || isVetoed(offer, resourceRequest, revocable, Optional.absent())) {
+
+        return Optional.absent();
+      }
+    }
+
+    return optionalOffer;
+  }
+
+  /**
+   * Returns a weakly-consistent iterable giving the available offers to a given
+   * {@code groupKey}. This iterable can handle concurrent operations on its underlying
+   * collection, and may reflect changes that happen after the construction of the iterable.
+   * This property is mainly used in {@code launchTask}.
+   *
+   * @param groupKey The task group to get offers for.
+   * @return The offers a given task group can use.
+   */
+  synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+                                                  ResourceRequest resourceRequest,
+                                                  boolean revocable) {
+
+    return Iterables.unmodifiableIterable(FluentIterable.from(offers)
+        .filter(o -> !isGloballyBanned(o))
+        .filter(o -> !isStaticallyBanned(o, groupKey))
+        .filter(HostOffer::hasCpuAndMem)
+        .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey))));
+  }
+
+  private synchronized boolean isGloballyBanned(HostOffer offer) {
+    return globallyBannedOffers.contains(offer.getOffer().getId());
+  }
+
+  private synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
+    return staticallyBannedOffers.getIfPresent(Pair.of(offer.getOffer().getId(), groupKey)) != null;
+  }
+
+  /**
+   * Determine whether or not the {@link HostOffer} is vetoed for the given {@link ResourceRequest}.
+   * If {@code groupKey} is present, this method will also temporarily ban the offer from ever
+   * matching the {@link TaskGroupKey}.
+   */
+  private boolean isVetoed(HostOffer offer,
+                           ResourceRequest resourceRequest,
+                           boolean revocable,
+                           Optional<TaskGroupKey> groupKey) {
+
+    vetoEvaluatedOffers.incrementAndGet();
+    UnusedResource unusedResource = new UnusedResource(offer, revocable);
+    Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest);
+    if (!vetoes.isEmpty()) {
+      if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) {
+        addStaticGroupBan(offer.getOffer().getId(), groupKey.get());
+      }
+
+      return true;
+    }
+
+    return false;
+  }
+
+  @VisibleForTesting
+  synchronized void addStaticGroupBan(Protos.OfferID offerId, TaskGroupKey groupKey) {
+    if (offersById.containsKey(offerId)) {
+      staticallyBannedOffers.put(Pair.of(offerId, groupKey), true);
+    }
+  }
+
+  @VisibleForTesting
+  synchronized Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() {
+    return staticallyBannedOffers.asMap().keySet();
+  }
+
+  synchronized void clear() {
+    offers.clear();
+    offersById.clear();
+    offersBySlave.clear();
+    offersByHost.clear();
+    staticallyBannedOffers.invalidateAll();
+    globallyBannedOffers.clear();
+  }
+
+  @VisibleForTesting
+  synchronized void cleanUpStaticallyBannedOffers() {
+    staticallyBannedOffers.cleanUp();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 96b0f46..0349215 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -13,41 +13,16 @@
  */
 package org.apache.aurora.scheduler.offers;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.cache.Cache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.Subscribe;
 
-import org.apache.aurora.common.collections.Pair;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.mesos.v1.Protos;
 import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.Offer.Operation;
 import org.apache.mesos.v1.Protos.OfferID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
 
@@ -61,7 +36,7 @@ public interface OfferManager extends EventSubscriber {
    *
    * @param offer Newly-available resource offer.
    */
-  void addOffer(HostOffer offer);
+  void add(HostOffer offer);
 
   /**
    * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
@@ -70,62 +45,70 @@ public interface OfferManager extends EventSubscriber {
    * @param offerId Cancelled offer.
    * @return A boolean on whether or not the offer was successfully cancelled.
    */
-  boolean cancelOffer(OfferID offerId);
+  boolean cancel(OfferID offerId);
 
   /**
    * Exclude an offer from being matched against all tasks.
    *
    * @param offerId Offer ID to ban.
    */
-  void banOffer(OfferID offerId);
+  void ban(OfferID offerId);
 
   /**
-   * Exclude an offer that results in a static mismatch from further attempts to match against all
-   * tasks from the same group.
+   * Notifies the offer queue that a host's attributes have changed.
    *
-   * @param offerId Offer ID to exclude for the given {@code groupKey}.
-   * @param groupKey Task group key to exclude.
+   * @param change State change notification.
    */
-  void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey);
+  void hostAttributesChanged(HostAttributesChanged change);
 
   /**
-   * Launches the task matched against the offer.
+   * Gets the offer for the given slave ID.
    *
-   * @param offerId Matched offer ID.
-   * @param task Matched task info.
-   * @throws LaunchException If there was an error launching the task.
+   * @param slaveId Slave ID to get the offer for.
+   * @return The offer for the slave ID.
    */
-  void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
+  Optional<HostOffer> get(AgentID slaveId);
 
   /**
-   * Notifies the offer queue that a host's attributes have changed.
+   * Gets all offers that the scheduler is holding, excluding banned offers.
    *
-   * @param change State change notification.
+   * @return A snapshot of the offers that the scheduler is currently holding.
    */
-  void hostAttributesChanged(HostAttributesChanged change);
+  Iterable<HostOffer> getAll();
 
   /**
-   * Gets the offers that the scheduler is holding, excluding banned offers.
+   * Gets the offer for the given slave ID if satisfies the supplied {@link ResourceRequest}.
    *
-   * @return A snapshot of the offers that the scheduler is currently holding.
+   * @param slaveId Slave ID to get the offer for.
+   * @param resourceRequest The request that the offer should satisfy.
+   * @param revocable Whether or not the request can use revocable resources.
+   * @return An option containing the offer for the slave ID if it fits.
    */
-  Iterable<HostOffer> getOffers();
+  Optional<HostOffer> getMatching(AgentID slaveId,
+                                  ResourceRequest resourceRequest,
+                                  boolean revocable);
 
   /**
-   * Gets all offers that are not banned for the given {@code groupKey}.
+   * Gets all offers that the scheduler is holding that satisfy the supplied
+   * {@link ResourceRequest}.
    *
-   * @param groupKey Task group key to check offers for.
-   * @return A snapshot of all offers eligible for the given {@code groupKey}.
+   * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}.
+   * @param resourceRequest The request that the offer should satisfy.
+   * @param revocable Whether or not the request can use revocable resources.
+   * @return An option containing the offer for the slave ID if it fits.
    */
-  Iterable<HostOffer> getOffers(TaskGroupKey groupKey);
+  Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+                                     ResourceRequest resourceRequest,
+                                     boolean revocable);
 
   /**
-   * Gets an offer for the given slave ID.
+   * Launches the task matched against the offer.
    *
-   * @param slaveId Slave ID to get offer for.
-   * @return An offer for the slave ID.
+   * @param offerId Matched offer ID.
+   * @param task Matched task info.
+   * @throws LaunchException If there was an error launching the task.
    */
-  Optional<HostOffer> getOffer(AgentID slaveId);
+  void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
 
   /**
    * Thrown when there was an unexpected failure trying to launch a task.
@@ -140,319 +123,4 @@ public interface OfferManager extends EventSubscriber {
       super(msg, cause);
     }
   }
-
-  class OfferManagerImpl implements OfferManager {
-    @VisibleForTesting
-    static final Logger LOG = LoggerFactory.getLogger(OfferManagerImpl.class);
-    @VisibleForTesting
-    static final String OFFER_ACCEPT_RACES = "offer_accept_races";
-    @VisibleForTesting
-    static final String OUTSTANDING_OFFERS = "outstanding_offers";
-    @VisibleForTesting
-    static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size";
-    @VisibleForTesting
-    static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate";
-    @VisibleForTesting
-    static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
-    @VisibleForTesting
-    static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size";
-
-    private final HostOffers hostOffers;
-    private final AtomicLong offerRaces;
-    private final AtomicLong offerCancelFailures;
-
-    private final Driver driver;
-    private final OfferSettings offerSettings;
-    private final Deferment offerDecline;
-
-    @Inject
-    @VisibleForTesting
-    public OfferManagerImpl(
-        Driver driver,
-        OfferSettings offerSettings,
-        StatsProvider statsProvider,
-        Deferment offerDecline) {
-
-      this.driver = requireNonNull(driver);
-      this.offerSettings = requireNonNull(offerSettings);
-      this.hostOffers = new HostOffers(statsProvider, offerSettings);
-      this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
-      this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
-      this.offerDecline = requireNonNull(offerDecline);
-    }
-
-    @Override
-    public void addOffer(HostOffer offer) {
-      Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
-      if (sameAgent.isPresent()) {
-        // We have an existing offer for the same agent.  We choose to return both offers so that
-        // they may be combined into a single offer.
-        LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
-            + " for compaction.");
-        decline(offer.getOffer().getId());
-        decline(sameAgent.get().getOffer().getId());
-      } else {
-        offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
-      }
-    }
-
-    private void removeAndDecline(OfferID id) {
-      if (removeFromHostOffers(id)) {
-        decline(id);
-      }
-    }
-
-    private void decline(OfferID id) {
-      LOG.debug("Declining offer {}", id);
-      driver.declineOffer(id, getOfferFilter());
-    }
-
-    private Protos.Filters getOfferFilter() {
-      return Protos.Filters.newBuilder()
-          .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS))
-          .build();
-    }
-
-    @Override
-    public boolean cancelOffer(final OfferID offerId) {
-      boolean success = removeFromHostOffers(offerId);
-      if (!success) {
-        // This will happen rarely when we race to process this rescind against accepting the offer
-        // to launch a task.
-        // If it happens frequently, we are likely processing rescinds before the offer itself.
-        LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
-        this.offerCancelFailures.incrementAndGet();
-      }
-      return success;
-    }
-
-    @Override
-    public void banOffer(OfferID offerId) {
-      hostOffers.addGlobalBan(offerId);
-    }
-
-    private boolean removeFromHostOffers(final OfferID offerId) {
-      requireNonNull(offerId);
-
-      // The small risk of inconsistency is acceptable here - if we have an accept/remove race
-      // on an offer, the master will mark the task as LOST and it will be retried.
-      return hostOffers.remove(offerId);
-    }
-
-    @Override
-    public Iterable<HostOffer> getOffers() {
-      return hostOffers.getOffers();
-    }
-
-    @Override
-    public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
-      return hostOffers.getWeaklyConsistentOffers(groupKey);
-    }
-
-    @Override
-    public Optional<HostOffer> getOffer(AgentID slaveId) {
-      return hostOffers.get(slaveId);
-    }
-
-    /**
-     * Updates the preference of a host's offers.
-     *
-     * @param change Host change notification.
-     */
-    @Subscribe
-    public void hostAttributesChanged(HostAttributesChanged change) {
-      hostOffers.updateHostAttributes(change.getAttributes());
-    }
-
-    /**
-     * Notifies the queue that the driver is disconnected, and all the stored offers are now
-     * invalid.
-     * <p>
-     * The queue takes this as a signal to flush its queue.
-     *
-     * @param event Disconnected event.
-     */
-    @Subscribe
-    public void driverDisconnected(DriverDisconnected event) {
-      LOG.info("Clearing stale offers since the driver is disconnected.");
-      hostOffers.clear();
-    }
-
-    /**
-     * Used for testing to ensure that the underlying cache's `size` method returns an accurate
-     * value by not including evicted entries.
-     */
-    @VisibleForTesting
-    public void cleanupStaticBans() {
-      hostOffers.staticallyBannedOffers.cleanUp();
-    }
-
-    /**
-     * A container for the data structures used by this class, to make it easier to reason about
-     * the different indices used and their consistency.
-     */
-    private static class HostOffers {
-
-      private final Set<HostOffer> offers;
-      private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
-      private final Map<AgentID, HostOffer> offersBySlave = Maps.newHashMap();
-      private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
-
-      // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
-      // scheduling attempts. See VetoGroup for more details on static ban.
-      private final Cache<Pair<OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers;
-
-      // Keep track of globally banned offers that will never be matched to anything.
-      private final Set<OfferID> globallyBannedOffers = Sets.newConcurrentHashSet();
-
-      HostOffers(StatsProvider statsProvider, OfferSettings offerSettings) {
-        offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering());
-        staticallyBannedOffers = offerSettings
-            .getStaticBanCacheBuilder()
-            .build();
-        // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
-        // Could track this separately if it turns out to pose problems.
-        statsProvider.exportSize(OUTSTANDING_OFFERS, offers);
-        statsProvider.makeGauge(STATICALLY_BANNED_OFFERS, staticallyBannedOffers::size);
-        statsProvider.makeGauge(STATICALLY_BANNED_OFFERS_HIT_RATE,
-            () -> staticallyBannedOffers.stats().hitRate());
-        statsProvider.makeGauge(GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size);
-      }
-
-      synchronized Optional<HostOffer> get(AgentID slaveId) {
-        HostOffer offer = offersBySlave.get(slaveId);
-        if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) {
-          return Optional.absent();
-        }
-
-        return Optional.of(offer);
-      }
-
-      /**
-       * Adds an offer while maintaining a guarantee that no two offers may exist with the same
-       * agent ID.  If an offer exists with the same agent ID, the existing offer is removed
-       * and returned, and {@code offer} is not added.
-       *
-       * @param offer Offer to add.
-       * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
-       *         which will also be removed prior to returning.
-       */
-      synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
-        HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
-        if (sameAgent != null) {
-          remove(sameAgent.getOffer().getId());
-          return Optional.of(sameAgent);
-        }
-
-        addInternal(offer);
-        return Optional.absent();
-      }
-
-      private void addInternal(HostOffer offer) {
-        offers.add(offer);
-        offersById.put(offer.getOffer().getId(), offer);
-        offersBySlave.put(offer.getOffer().getAgentId(), offer);
-        offersByHost.put(offer.getOffer().getHostname(), offer);
-      }
-
-      synchronized boolean remove(OfferID id) {
-        HostOffer removed = offersById.remove(id);
-        if (removed != null) {
-          offers.remove(removed);
-          offersBySlave.remove(removed.getOffer().getAgentId());
-          offersByHost.remove(removed.getOffer().getHostname());
-        }
-        globallyBannedOffers.remove(id);
-        return removed != null;
-      }
-
-      synchronized void updateHostAttributes(IHostAttributes attributes) {
-        HostOffer offer = offersByHost.remove(attributes.getHost());
-        if (offer != null) {
-          // Remove and re-add a host's offer to re-sort based on its new hostStatus
-          remove(offer.getOffer().getId());
-          addInternal(new HostOffer(offer.getOffer(), attributes));
-        }
-      }
-
-      /**
-       * Returns an iterable giving the state of the offers at the time the method is called. Unlike
-       * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and
-       * will not be modified outside of the returned iterable.
-       *
-       * @return The offers currently known by the scheduler.
-       */
-      synchronized Iterable<HostOffer> getOffers() {
-        return FluentIterable.from(offers).filter(
-            e -> !globallyBannedOffers.contains(e.getOffer().getId())
-        ).toSet();
-      }
-
-      /**
-       * Returns a weakly-consistent iterable giving the available offers to a given
-       * {@code groupKey}. This iterable can handle concurrent operations on its underlying
-       * collection, and may reflect changes that happen after the construction of the iterable.
-       * This property is mainly used in {@code launchTask}.
-       *
-       * @param groupKey The task group to get offers for.
-       * @return The offers a given task group can use.
-       */
-      synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) {
-        return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(e ->
-            staticallyBannedOffers.getIfPresent(Pair.of(e.getOffer().getId(), groupKey)) == null
-                && !globallyBannedOffers.contains(e.getOffer().getId())));
-      }
-
-      synchronized void addGlobalBan(OfferID offerId) {
-        globallyBannedOffers.add(offerId);
-      }
-
-      synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) {
-        if (offersById.containsKey(offerId)) {
-          staticallyBannedOffers.put(Pair.of(offerId, groupKey), true);
-        }
-      }
-
-      synchronized void clear() {
-        offers.clear();
-        offersById.clear();
-        offersBySlave.clear();
-        offersByHost.clear();
-        staticallyBannedOffers.invalidateAll();
-        globallyBannedOffers.clear();
-      }
-    }
-
-    @Override
-    public void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey) {
-      hostOffers.addStaticGroupBan(offerId, groupKey);
-    }
-
-    @Timed("offer_manager_launch_task")
-    @Override
-    public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException {
-      // Guard against an offer being removed after we grabbed it from the iterator.
-      // If that happens, the offer will not exist in hostOffers, and we can immediately
-      // send it back to LOST for quick reschedule.
-      // Removing while iterating counts on the use of a weakly-consistent iterator being used,
-      // which is a feature of ConcurrentSkipListSet.
-      if (hostOffers.remove(offerId)) {
-        try {
-          Operation launch = Operation.newBuilder()
-              .setType(Operation.Type.LAUNCH)
-              .setLaunch(Operation.Launch.newBuilder().addTaskInfos(task))
-              .build();
-          driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter());
-        } catch (IllegalStateException e) {
-          // TODO(William Farner): Catch only the checked exception produced by Driver
-          // once it changes from throwing IllegalStateException when the driver is not yet
-          // registered.
-          throw new LaunchException("Failed to launch task.", e);
-        }
-      } else {
-        offerRaces.incrementAndGet();
-        throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
new file mode 100644
index 0000000..427b1b4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.mesos.v1.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
+
+public class OfferManagerImpl implements OfferManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(org.apache.aurora.scheduler.offers.OfferManagerImpl.class);
+
+  @VisibleForTesting
+  static final String OFFER_ACCEPT_RACES = "offer_accept_races";
+  @VisibleForTesting
+  static final String OUTSTANDING_OFFERS = "outstanding_offers";
+  @VisibleForTesting
+  static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size";
+  @VisibleForTesting
+  static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate";
+  @VisibleForTesting
+  static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
+  @VisibleForTesting
+  static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size";
+  @VisibleForTesting
+  static final String VETO_EVALUATED_OFFERS = "veto_evaluated_offers";
+
+  private final HostOffers hostOffers;
+  private final AtomicLong offerRaces;
+  private final AtomicLong offerCancelFailures;
+
+  private final Driver driver;
+  private final OfferSettings offerSettings;
+  private final Deferment offerDecline;
+
+  @Inject
+  @VisibleForTesting
+  public OfferManagerImpl(
+      Driver driver,
+      OfferSettings offerSettings,
+      StatsProvider statsProvider,
+      Deferment offerDecline,
+      SchedulingFilter schedulingFilter) {
+
+    this.driver = requireNonNull(driver);
+    this.offerSettings = requireNonNull(offerSettings);
+    this.hostOffers = new HostOffers(statsProvider, offerSettings, schedulingFilter);
+    this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
+    this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
+    this.offerDecline = requireNonNull(offerDecline);
+  }
+
+  @Override
+  public void add(HostOffer offer) {
+    Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
+    if (sameAgent.isPresent()) {
+      // We have an existing offer for the same agent.  We choose to return both offers so that
+      // they may be combined into a single offer.
+      LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
+          + " for compaction.");
+      decline(offer.getOffer().getId());
+      decline(sameAgent.get().getOffer().getId());
+    } else {
+      offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
+    }
+  }
+
+  private void removeAndDecline(Protos.OfferID id) {
+    if (removeFromHostOffers(id)) {
+      decline(id);
+    }
+  }
+
+  private void decline(Protos.OfferID id) {
+    LOG.debug("Declining offer {}", id);
+    driver.declineOffer(id, getOfferFilter());
+  }
+
+  private Protos.Filters getOfferFilter() {
+    return Protos.Filters.newBuilder()
+        .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS))
+        .build();
+  }
+
+  @Override
+  public boolean cancel(final Protos.OfferID offerId) {
+    boolean success = removeFromHostOffers(offerId);
+    if (!success) {
+      // This will happen rarely when we race to process this rescind against accepting the offer
+      // to launch a task.
+      // If it happens frequently, we are likely processing rescinds before the offer itself.
+      LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
+      this.offerCancelFailures.incrementAndGet();
+    }
+    return success;
+  }
+
+  private boolean removeFromHostOffers(final Protos.OfferID offerId) {
+    requireNonNull(offerId);
+
+    // The small risk of inconsistency is acceptable here - if we have an accept/remove race
+    // on an offer, the master will mark the task as LOST and it will be retried.
+    return hostOffers.remove(offerId);
+  }
+
+  @Override
+  public void ban(Protos.OfferID offerId) {
+    hostOffers.addGlobalBan(offerId);
+  }
+
+  /**
+   * Updates the preference of a host's offers.
+   *
+   * @param change Host change notification.
+   */
+  @Subscribe
+  public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
+    hostOffers.updateHostAttributes(change.getAttributes());
+  }
+
+  @Override
+  public Optional<HostOffer> get(Protos.AgentID slaveId) {
+    return hostOffers.get(slaveId);
+  }
+
+  @Override
+  public Iterable<HostOffer> getAll() {
+    return hostOffers.getOffers();
+  }
+
+  @Override
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+                                         ResourceRequest resourceRequest,
+                                         boolean revocable) {
+
+    return hostOffers.getMatching(slaveId, resourceRequest, revocable);
+  }
+
+  @Override
+  public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+                                            ResourceRequest resourceRequest,
+                                            boolean revocable) {
+
+    return hostOffers.getAllMatching(groupKey, resourceRequest, revocable);
+  }
+
+  /**
+   * Notifies the queue that the driver is disconnected, and all the stored offers are now
+   * invalid.
+   * <p>
+   * The queue takes this as a signal to flush its queue.
+   *
+   * @param event Disconnected event.
+   */
+  @Subscribe
+  public void driverDisconnected(PubsubEvent.DriverDisconnected event) {
+    LOG.info("Clearing stale offers since the driver is disconnected.");
+    hostOffers.clear();
+  }
+
+  @Timed("offer_manager_launch_task")
+  @Override
+  public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) throws LaunchException {
+    // Guard against an offer being removed after we grabbed it from the iterator.
+    // If that happens, the offer will not exist in hostOffers, and we can immediately
+    // send it back to LOST for quick reschedule.
+    // Removing while iterating counts on the use of a weakly-consistent iterator being used,
+    // which is a feature of ConcurrentSkipListSet.
+    if (hostOffers.remove(offerId)) {
+      try {
+        Protos.Offer.Operation launch = Protos.Offer.Operation.newBuilder()
+            .setType(Protos.Offer.Operation.Type.LAUNCH)
+            .setLaunch(Protos.Offer.Operation.Launch.newBuilder().addTaskInfos(task))
+            .build();
+        driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter());
+      } catch (IllegalStateException e) {
+        // TODO(William Farner): Catch only the checked exception produced by Driver
+        // once it changes from throwing IllegalStateException when the driver is not yet
+        // registered.
+        throw new LaunchException("Failed to launch task.", e);
+      }
+    } else {
+      offerRaces.incrementAndGet();
+      throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
+    }
+  }
+
+  /**
+   * Get all static bans.
+   */
+  @VisibleForTesting
+  Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() {
+    return hostOffers.getStaticBans();
+  }
+
+  /**
+   * Exclude an offer that results in a static mismatch from further attempts to match against all
+   * tasks from the same group.
+   */
+  @VisibleForTesting
+  void banForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) {
+    hostOffers.addStaticGroupBan(offerId, groupKey);
+  }
+
+  /**
+   * Used for testing to ensure that the underlying cache's `size` method returns an accurate
+   * value by not including evicted entries.
+   */
+  @VisibleForTesting
+  void cleanupStaticBans() {
+    hostOffers.cleanUpStaticallyBannedOffers();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
new file mode 100644
index 0000000..e2e3628
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+
+import javax.inject.Qualifier;
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Supplier;
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Random;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.app.MoreModules;
+import org.apache.aurora.scheduler.config.CliOptions;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
+import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Binding module for resource offer management.
+ */
+public class OfferManagerModule extends AbstractModule {
+  private static final Logger LOG = LoggerFactory.getLogger(OfferManagerModule.class);
+
+  @Parameters(separators = "=")
+  public static class Options {
+    @Parameter(names = "-hold_offers_forever",
+        description =
+            "Hold resource offers indefinitely, disabling automatic offer decline settings.",
+        arity = 1)
+    public boolean holdOffersForever = false;
+
+    @Parameter(names = "-min_offer_hold_time",
+        validateValueWith = NotNegativeAmount.class,
+        description = "Minimum amount of time to hold a resource offer before declining.")
+    public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
+
+    @Parameter(names = "-offer_hold_jitter_window",
+        validateValueWith = NotNegativeAmount.class,
+        description = "Maximum amount of random jitter to add to the offer hold time window.")
+    public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
+
+    @Parameter(names = "-offer_filter_duration",
+        description =
+            "Duration after which we expect Mesos to re-offer unused resources. A short duration "
+                + "improves scheduling performance in smaller clusters, but might lead to resource "
+                + "starvation for other frameworks if you run many frameworks in your cluster.")
+    public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
+
+    @Parameter(names = "-unavailability_threshold",
+        description =
+            "Threshold time, when running tasks should be drained from a host, before a host "
+                + "becomes unavailable. Should be greater than min_offer_hold_time + "
+                + "offer_hold_jitter_window.")
+    public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
+
+    @Parameter(names = "-offer_order",
+        description =
+            "Iteration order for offers, to influence task scheduling. Multiple orderings will be "
+                + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
+                + " then memory and finally would randomize any equal offers.")
+    public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
+
+    @Parameter(names = "-offer_order_modules",
+        description = "Custom Guice module to provide an offer ordering.")
+    @SuppressWarnings("rawtypes")
+    public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class);
+
+    @Parameter(names = "-offer_static_ban_cache_max_size",
+        validateValueWith = NotNegativeNumber.class,
+        description =
+            "The number of offers to hold in the static ban cache. If no value is specified, "
+                + "the cache will grow indefinitely. However, entries will expire within "
+                + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
+    public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
+  }
+
+  /**
+   * Binding annotation for the threshold to veto tasks with unavailability.
+   */
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface UnavailabilityThreshold { }
+
+  public static class OfferOrderModule extends AbstractModule {
+    private final CliOptions options;
+
+    public OfferOrderModule(CliOptions options) {
+      this.options = options;
+    }
+
+    @Override
+    protected void configure() {
+      bind(new TypeLiteral<Ordering<HostOffer>>() { })
+          .toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
+    }
+  }
+
+  private final CliOptions cliOptions;
+
+  public OfferManagerModule(CliOptions cliOptions) {
+    this.cliOptions = cliOptions;
+  }
+
+  @Override
+  protected void configure() {
+    Options options = cliOptions.offer;
+    if (!options.holdOffersForever) {
+      long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
+          + options.minOfferHoldTime.as(Time.SECONDS);
+      if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
+        LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
+                + " and offer_hold_jitter_window ({}). This creates risks of races between "
+                + "launching and draining",
+            options.unavailabilityThreshold,
+            options.minOfferHoldTime,
+            options.offerHoldJitterWindow);
+      }
+    }
+
+    for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
+      install(module);
+    }
+
+    bind(new TypeLiteral<Amount<Long, Time>>() { })
+        .annotatedWith(UnavailabilityThreshold.class)
+        .toInstance(options.unavailabilityThreshold);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        if (options.holdOffersForever) {
+          bind(Deferment.class).to(Deferment.Noop.class);
+        } else {
+          bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
+              new RandomJitterReturnDelay(
+                  options.minOfferHoldTime.as(Time.MILLISECONDS),
+                  options.offerHoldJitterWindow.as(Time.MILLISECONDS),
+                  Random.Util.newDefaultRandom()));
+          bind(Deferment.class).to(Deferment.DelayedDeferment.class);
+        }
+
+        bind(OfferManager.class).to(OfferManagerImpl.class);
+        bind(OfferManagerImpl.class).in(Singleton.class);
+        expose(OfferManager.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
+  }
+
+  @Provides
+  @Singleton
+  OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
+    // We have a dual eviction strategy for the static ban cache in OfferManager that is based on
+    // both maximum size of the cache and the length an offer is valid. We do this in order to
+    // satisfy requirements in both single- and multi-framework environments. If offers are held for
+    // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
+    // longest it will be valid for. Additionally, cluster operators will most likely not have to
+    // worry about cache size in this case as this behavior mimics current behavior. If offers are
+    // held indefinitely, then we never expire cache entries but the cluster operator can specify a
+    // maximum size to avoid a memory leak.
+    long maxOfferHoldTime;
+    if (cliOptions.offer.holdOffersForever) {
+      maxOfferHoldTime = Long.MAX_VALUE;
+    } else {
+      maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
+          + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
+    }
+
+    return new OfferSettings(
+        cliOptions.offer.offerFilterDuration,
+        offerOrdering,
+        Amount.of(maxOfferHoldTime, Time.SECONDS),
+        cliOptions.offer.offerStaticBanCacheMaxSize,
+        Ticker.systemTicker());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
index 57fc1a1..838a319 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
@@ -29,6 +29,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * Settings required to create an OfferManager.
  */
+@VisibleForTesting
 public class OfferSettings {
 
   private final Amount<Long, Time> filterDuration;
@@ -67,14 +68,14 @@ public class OfferSettings {
   /**
    * Duration after which we want Mesos to re-offer unused or declined resources.
    */
-  public Amount<Long, Time> getFilterDuration() {
+  Amount<Long, Time> getFilterDuration() {
     return filterDuration;
   }
 
   /**
    * The ordering to use when fetching offers from OfferManager.
    */
-  public Ordering<HostOffer> getOrdering() {
+  Ordering<HostOffer> getOrdering() {
     return ordering;
   }
 
@@ -82,7 +83,7 @@ public class OfferSettings {
    * The builder for the static ban cache. Cache settings (e.g. max size, entry expiration) should
    * already be added to the builder by this point.
    */
-  public CacheBuilder<Object, Object> getStaticBanCacheBuilder() {
+  CacheBuilder<Object, Object> getStaticBanCacheBuilder() {
     return staticBanCacheBuilder;
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
deleted file mode 100644
index 4a6ea8d..0000000
--- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.offers;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.List;
-
-import javax.inject.Qualifier;
-import javax.inject.Singleton;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.base.Supplier;
-import com.google.common.base.Ticker;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Ordering;
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.util.Random;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.app.MoreModules;
-import org.apache.aurora.scheduler.config.CliOptions;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
-import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Binding module for resource offer management.
- */
-public class OffersModule extends AbstractModule {
-  private static final Logger LOG = LoggerFactory.getLogger(OffersModule.class);
-
-  @Parameters(separators = "=")
-  public static class Options {
-    @Parameter(names = "-hold_offers_forever",
-        description =
-            "Hold resource offers indefinitely, disabling automatic offer decline settings.",
-        arity = 1)
-    public boolean holdOffersForever = false;
-
-    @Parameter(names = "-min_offer_hold_time",
-        validateValueWith = NotNegativeAmount.class,
-        description = "Minimum amount of time to hold a resource offer before declining.")
-    public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
-
-    @Parameter(names = "-offer_hold_jitter_window",
-        validateValueWith = NotNegativeAmount.class,
-        description = "Maximum amount of random jitter to add to the offer hold time window.")
-    public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
-
-    @Parameter(names = "-offer_filter_duration",
-        description =
-            "Duration after which we expect Mesos to re-offer unused resources. A short duration "
-                + "improves scheduling performance in smaller clusters, but might lead to resource "
-                + "starvation for other frameworks if you run many frameworks in your cluster.")
-    public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
-
-    @Parameter(names = "-unavailability_threshold",
-        description =
-            "Threshold time, when running tasks should be drained from a host, before a host "
-                + "becomes unavailable. Should be greater than min_offer_hold_time + "
-                + "offer_hold_jitter_window.")
-    public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
-
-    @Parameter(names = "-offer_order",
-        description =
-            "Iteration order for offers, to influence task scheduling. Multiple orderings will be "
-                + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
-                + " then memory and finally would randomize any equal offers.")
-    public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
-
-    @Parameter(names = "-offer_order_modules",
-        description = "Custom Guice module to provide an offer ordering.")
-    @SuppressWarnings("rawtypes")
-    public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class);
-
-    @Parameter(names = "-offer_static_ban_cache_max_size",
-        validateValueWith = NotNegativeNumber.class,
-        description =
-            "The number of offers to hold in the static ban cache. If no value is specified, "
-                + "the cache will grow indefinitely. However, entries will expire within "
-                + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
-    public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
-  }
-
-  public static class OfferOrderModule extends AbstractModule {
-    private final CliOptions options;
-
-    public OfferOrderModule(CliOptions options) {
-      this.options = options;
-    }
-
-    @Override
-    protected void configure() {
-      bind(new TypeLiteral<Ordering<HostOffer>>() { })
-          .toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
-    }
-  }
-
-  /**
-   * Binding annotation for the threshold to veto tasks with unavailability.
-   */
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface UnavailabilityThreshold { }
-
-  private final CliOptions cliOptions;
-
-  public OffersModule(CliOptions cliOptions) {
-    this.cliOptions = cliOptions;
-  }
-
-  @Override
-  protected void configure() {
-    Options options = cliOptions.offer;
-    if (!options.holdOffersForever) {
-      long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
-          + options.minOfferHoldTime.as(Time.SECONDS);
-      if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
-        LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
-                + " and offer_hold_jitter_window ({}). This creates risks of races between "
-                + "launching and draining",
-            options.unavailabilityThreshold,
-            options.minOfferHoldTime,
-            options.offerHoldJitterWindow);
-      }
-    }
-
-    for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
-      install(module);
-    }
-
-    bind(new TypeLiteral<Amount<Long, Time>>() { })
-        .annotatedWith(UnavailabilityThreshold.class)
-        .toInstance(options.unavailabilityThreshold);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        if (options.holdOffersForever) {
-          bind(Deferment.class).to(Deferment.Noop.class);
-        } else {
-          bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
-              new RandomJitterReturnDelay(
-                  options.minOfferHoldTime.as(Time.MILLISECONDS),
-                  options.offerHoldJitterWindow.as(Time.MILLISECONDS),
-                  Random.Util.newDefaultRandom()));
-          bind(Deferment.class).to(Deferment.DelayedDeferment.class);
-        }
-
-        bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
-        bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
-        expose(OfferManager.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
-  }
-
-  @Provides
-  @Singleton
-  OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
-    // We have a dual eviction strategy for the static ban cache in OfferManager that is based on
-    // both maximum size of the cache and the length an offer is valid. We do this in order to
-    // satisfy requirements in both single- and multi-framework environments. If offers are held for
-    // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
-    // longest it will be valid for. Additionally, cluster operators will most likely not have to
-    // worry about cache size in this case as this behavior mimics current behavior. If offers are
-    // held indefinitely, then we never expire cache entries but the cluster operator can specify a
-    // maximum size to avoid a memory leak.
-    long maxOfferHoldTime;
-    if (cliOptions.offer.holdOffersForever) {
-      maxOfferHoldTime = Long.MAX_VALUE;
-    } else {
-      maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
-          + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
-    }
-
-    return new OfferSettings(
-        cliOptions.offer.offerFilterDuration,
-        offerOrdering,
-        Amount.of(maxOfferHoldTime, Time.SECONDS),
-        cliOptions.offer.offerStaticBanCacheMaxSize,
-        Ticker.systemTicker());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
index 766d3b2..497a766 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
@@ -143,7 +143,7 @@ public class PendingTaskProcessor implements Runnable {
 
       // Group the offers by slave id so they can be paired with active tasks from the same slave.
       Map<String, HostOffer> slavesToOffers =
-          Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
+          Maps.uniqueIndex(offerManager.getAll(), OFFER_TO_SLAVE_ID);
 
       Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
           slavesToOffers.keySet(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
index 82a0ff6..ffb8b90 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
@@ -94,7 +94,7 @@ public interface Preemptor {
                 pendingTask.getTask(),
                 slot.getVictims(),
                 jobState,
-                offerManager.getOffer(slaveId),
+                offerManager.get(slaveId),
                 store);
 
         metrics.recordSlotValidationResult(validatedVictims);