You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/11/28 19:02:37 UTC
[3/3] aurora git commit: Enable custom offer scoring modules for task
assignment
Enable custom offer scoring modules for task assignment
Major portions of the refactor:
* Refactor `OfferManager` to do filtering of offers (added `getMatching` and
`getAllMatching` methods) as opposed to TaskAssigner
* Refactor `TaskAssigner`, allow for injection of custom "scoring" class
through `OfferRanker` interface
And some minor things as well:
* Moved `TaskAssignerImpl`, `TaskSchedulerImpl`, and `HostOffers` into their own
upper-level classes
* Moved `TaskAssigner` to the `scheduling` package and out of the `state` package
* Renamed some methods in `OfferManager` to avoid code stutter
* Renaming of some classes (e.g. `FirstFitTaskAssigner` -> `TaskAssignerImpl`)
* And a slew of others
Reviewed at https://reviews.apache.org/r/63973/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/80139da4
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/80139da4
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/80139da4
Branch: refs/heads/master
Commit: 80139da4624916e406c7e80c4ea2d286d4d859c3
Parents: 21af250
Author: Jordan Ly <jo...@gmail.com>
Authored: Tue Nov 28 11:02:14 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Nov 28 11:02:14 2017 -0800
----------------------------------------------------------------------
RELEASE-NOTES.md | 3 +
docs/reference/scheduler-configuration.md | 2 +-
.../org/apache/aurora/benchmark/Offers.java | 2 +-
.../aurora/benchmark/SchedulingBenchmarks.java | 16 +-
.../benchmark/fakes/FakeOfferManager.java | 31 +-
.../apache/aurora/scheduler/app/AppModule.java | 4 +-
.../aurora/scheduler/config/CliOptions.java | 6 +-
.../scheduler/filter/AttributeAggregate.java | 38 +-
.../scheduler/filter/SchedulingFilter.java | 5 +
.../scheduler/filter/SchedulingFilterImpl.java | 2 +-
.../apache/aurora/scheduler/http/Offers.java | 2 +-
.../scheduler/mesos/MesosCallbackHandler.java | 12 +-
.../aurora/scheduler/offers/HostOffers.java | 253 +++++++++
.../aurora/scheduler/offers/OfferManager.java | 404 ++------------
.../scheduler/offers/OfferManagerImpl.java | 246 +++++++++
.../scheduler/offers/OfferManagerModule.java | 211 ++++++++
.../aurora/scheduler/offers/OfferSettings.java | 7 +-
.../aurora/scheduler/offers/OffersModule.java | 211 --------
.../preemptor/PendingTaskProcessor.java | 2 +-
.../aurora/scheduler/preemptor/Preemptor.java | 2 +-
.../scheduling/FirstFitOfferSelector.java | 29 +
.../scheduling/FirstFitOfferSelectorModule.java | 26 +
.../scheduler/scheduling/OfferSelector.java | 36 ++
.../scheduler/scheduling/SchedulingModule.java | 4 +-
.../scheduler/scheduling/TaskAssigner.java | 46 ++
.../scheduler/scheduling/TaskAssignerImpl.java | 284 ++++++++++
.../scheduling/TaskAssignerImplModule.java | 59 ++
.../scheduler/scheduling/TaskScheduler.java | 191 -------
.../scheduler/scheduling/TaskSchedulerImpl.java | 207 +++++++
.../state/FirstFitTaskAssignerModule.java | 31 --
.../aurora/scheduler/state/StateModule.java | 3 +-
.../aurora/scheduler/state/TaskAssigner.java | 338 ------------
.../scheduler/stats/AsyncStatsModule.java | 2 +-
.../scheduler/config/CommandLineTest.java | 4 +-
.../aurora/scheduler/http/OffersTest.java | 4 +-
.../mesos/MesosCallbackHandlerTest.java | 20 +-
.../scheduler/offers/OfferManagerImplTest.java | 381 +++++++++----
.../preemptor/PendingTaskProcessorTest.java | 2 +-
.../scheduler/preemptor/PreemptorImplTest.java | 2 +-
.../preemptor/PreemptorModuleTest.java | 2 +-
.../scheduling/FirstFitOfferSelectorTest.java | 66 +++
.../scheduling/TaskAssignerImplTest.java | 374 +++++++++++++
.../scheduling/TaskSchedulerImplTest.java | 2 -
.../state/FirstFitTaskAssignerTest.java | 539 -------------------
.../scheduler/stats/AsyncStatsModuleTest.java | 2 +-
45 files changed, 2244 insertions(+), 1869 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 2d3c423..54dcc75 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -12,6 +12,9 @@
a production cluster. For that reason, the functionality is behind a new flag `-partition_aware`
that is disabled by default. When Mesos support is improved and the new behavior is vetted in
production clusters, we'll enable this by default.
+- Added the ability to "score" offers for a given scheduling assignment via the `OfferSelector`
+ interface. The default implementation is first fit, but cluster operators can inject a custom
+ scoring algorithm through the `-offer_selector_modules` flag.
### Deprecations and removals:
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 6c385b5..f697b6f 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -222,7 +222,7 @@ Optional flags:
Time for a stat to be retained in memory before expiring.
-stat_sampling_interval (default (1, secs))
Statistic value sampling interval.
--task_assigner_modules (default [class org.apache.aurora.scheduler.state.FirstFitTaskAssignerModule])
+-task_assigner_modules (default [class org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule])
Guice modules for replacing task assignment logic.
-thermos_executor_cpu (default 0.25)
The number of CPU cores to allocate for each instance of the executor.
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/Offers.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
index 2b46326..2fcc804 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
@@ -50,7 +50,7 @@ final class Offers {
*/
static void addOffers(OfferManager offerManager, Iterable<HostOffer> offers) {
for (HostOffer offer : offers) {
- offerManager.addOffer(offer);
+ offerManager.add(offer);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 1708a50..58e3224 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -62,15 +62,17 @@ import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
import org.apache.aurora.scheduler.offers.Deferment;
import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManagerImpl;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
import org.apache.aurora.scheduler.offers.OfferOrder;
import org.apache.aurora.scheduler.offers.OfferSettings;
-import org.apache.aurora.scheduler.offers.OffersModule;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.ClusterStateImpl;
import org.apache.aurora.scheduler.preemptor.PreemptorModule;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
import org.apache.aurora.scheduler.scheduling.TaskScheduler;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
+import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.ReservationDuration;
import org.apache.aurora.scheduler.state.StateModule;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -147,8 +149,8 @@ public class SchedulingBenchmarks {
bind(ScheduledExecutorService.class).annotatedWith(AsyncModule.AsyncExecutor.class)
.toInstance(new NoopExecutor());
bind(Deferment.class).to(Deferment.Noop.class);
- bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
- bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
+ bind(OfferManager.class).to(OfferManagerImpl.class);
+ bind(OfferManagerImpl.class).in(Singleton.class);
bind(OfferSettings.class).toInstance(
new OfferSettings(NO_DELAY,
ImmutableList.of(OfferOrder.RANDOM),
@@ -157,8 +159,8 @@ public class SchedulingBenchmarks {
new FakeTicker()));
bind(BiCache.BiCacheSettings.class).toInstance(
new BiCache.BiCacheSettings(DELAY_FOREVER, ""));
- bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
- bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+ bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+ bind(TaskSchedulerImpl.class).in(Singleton.class);
expose(TaskScheduler.class);
expose(OfferManager.class);
}
@@ -171,7 +173,7 @@ public class SchedulingBenchmarks {
.toInstance(DELAY_FOREVER);
bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class);
bind(new TypeLiteral<Amount<Long, Time>>() { })
- .annotatedWith(OffersModule.UnavailabilityThreshold.class)
+ .annotatedWith(OfferManagerModule.UnavailabilityThreshold.class)
.toInstance(Amount.of(1L, Time.MINUTES));
bind(UpdateAgentReserver.class).to(UpdateAgentReserver.NullAgentReserver.class);
bind(UpdateAgentReserver.NullAgentReserver.class).in(Singleton.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index 201aa81..05c58ab 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -18,22 +18,23 @@ import com.google.common.base.Optional;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.mesos.v1.Protos;
public class FakeOfferManager implements OfferManager {
@Override
- public void addOffer(HostOffer offer) {
+ public void add(HostOffer offer) {
// no-op
}
@Override
- public boolean cancelOffer(Protos.OfferID offerId) {
+ public boolean cancel(Protos.OfferID offerId) {
return false;
}
@Override
- public void banOffer(Protos.OfferID offerId) {
+ public void ban(Protos.OfferID offerId) {
// no-op
}
@@ -43,27 +44,33 @@ public class FakeOfferManager implements OfferManager {
}
@Override
- public void banOfferForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) {
+ public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
// no-op
}
@Override
- public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
- return null;
+ public Optional<HostOffer> get(Protos.AgentID agentId) {
+ return Optional.absent();
}
@Override
- public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
- // no-op
+ public Iterable<HostOffer> getAll() {
+ return null;
}
@Override
- public Iterable<HostOffer> getOffers() {
- return null;
+ public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return Optional.absent();
}
@Override
- public Optional<HostOffer> getOffer(Protos.AgentID agentId) {
- return Optional.absent();
+ public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 3204cca..817a019 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -47,7 +47,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.http.JettyServerModule;
import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
import org.apache.aurora.scheduler.metadata.MetadataModule;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
import org.apache.aurora.scheduler.preemptor.PreemptorModule;
import org.apache.aurora.scheduler.pruning.PruningModule;
import org.apache.aurora.scheduler.quota.QuotaModule;
@@ -172,7 +172,7 @@ public class AppModule extends AbstractModule {
install(new PubsubEventModule());
install(new AsyncModule(options.async));
- install(new OffersModule(options));
+ install(new OfferManagerModule(options));
install(new PruningModule(options.pruning));
install(new ReconciliationModule(options.reconciliation));
install(new SchedulingModule(options.scheduling));
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
index d4537e3..b7f43e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
+++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
@@ -36,12 +36,13 @@ import org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule;
import org.apache.aurora.scheduler.http.api.security.Kerberos5ShiroRealmModule;
import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
import org.apache.aurora.scheduler.preemptor.PreemptorModule;
import org.apache.aurora.scheduler.pruning.PruningModule;
import org.apache.aurora.scheduler.reconciliation.ReconciliationModule;
import org.apache.aurora.scheduler.resources.ResourceSettings;
import org.apache.aurora.scheduler.scheduling.SchedulingModule;
+import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule;
import org.apache.aurora.scheduler.sla.SlaModule;
import org.apache.aurora.scheduler.state.StateModule;
import org.apache.aurora.scheduler.stats.AsyncStatsModule;
@@ -54,7 +55,7 @@ import org.apache.aurora.scheduler.updater.UpdaterModule;
public class CliOptions {
public final ReconciliationModule.Options reconciliation =
new ReconciliationModule.Options();
- public final OffersModule.Options offer = new OffersModule.Options();
+ public final OfferManagerModule.Options offer = new OfferManagerModule.Options();
public final ExecutorModule.Options executor = new ExecutorModule.Options();
public final AppModule.Options app = new AppModule.Options();
public final SchedulerMain.Options main = new SchedulerMain.Options();
@@ -84,6 +85,7 @@ public class CliOptions {
public final StatsModule.Options stats = new StatsModule.Options();
public final CronModule.Options cron = new CronModule.Options();
public final ResourceSettings resourceSettings = new ResourceSettings();
+ public final TaskAssignerImplModule.Options taskAssigner = new TaskAssignerImplModule.Options();
final List<Object> custom;
public CliOptions() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index 60f141d..a5acafa 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -49,19 +49,8 @@ public final class AttributeAggregate {
*/
private Supplier<Multiset<Pair<String, String>>> aggregate;
- private boolean isInitialized = false;
-
private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) {
- this.aggregate = Suppliers.memoize(
- () -> {
- initialize();
- return aggregate.get();
- }
- );
- }
-
- private void initialize() {
- isInitialized = true; // inlining this assignment yields a PMD false positive
+ this.aggregate = Suppliers.memoize(aggregate);
}
/**
@@ -123,21 +112,16 @@ public final class AttributeAggregate {
}
public void updateAttributeAggregate(IHostAttributes attributes) {
- // If the aggregate supplier has not been populated there is no need to update it here.
- // All tasks attributes will be picked up by the wrapped task query if executed at a
- // later point in time.
- if (isInitialized) {
- final Supplier<Multiset<Pair<String, String>>> previous = aggregate;
- aggregate = Suppliers.memoize(
- () -> {
- ImmutableMultiset.Builder<Pair<String, String>> builder
- = new ImmutableMultiset.Builder<>();
- builder.addAll(previous.get());
- addAttributes(builder, attributes.getAttributes());
- return builder.build();
- }
- );
- }
+ final Supplier<Multiset<Pair<String, String>>> previous = aggregate;
+ aggregate = Suppliers.memoize(
+ () -> {
+ ImmutableMultiset.Builder<Pair<String, String>> builder
+ = new ImmutableMultiset.Builder<>();
+ builder.addAll(previous.get());
+ addAttributes(builder, attributes.getAttributes());
+ return builder.build();
+ }
+ );
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 36608a9..a00c095 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -258,6 +259,10 @@ public interface SchedulingFilter {
this(offer, attributes, Optional.absent());
}
+ public UnusedResource(HostOffer offer, boolean revocable) {
+ this(offer.getResourceBag(revocable), offer.getAttributes(), offer.getUnavailabilityStart());
+ }
+
public UnusedResource(ResourceBag offer, IHostAttributes attributes, Optional<Instant> start) {
this.offer = offer;
this.attributes = attributes;
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index df51d4c..41a0764 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -32,7 +32,7 @@ import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.offers.OffersModule.UnavailabilityThreshold;
+import org.apache.aurora.scheduler.offers.OfferManagerModule.UnavailabilityThreshold;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.entities.IAttribute;
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index f22ca6e..bb92cd0 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -59,7 +59,7 @@ public class Offers {
public Response getOffers() throws JsonProcessingException {
return Response.ok(
mapper.writeValueAsString(
- StreamSupport.stream(offerManager.getOffers().spliterator(), false)
+ StreamSupport.stream(offerManager.getAll().spliterator(), false)
.map(o -> o.getOffer())
.collect(Collectors.toList())))
.build();
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index fd5874d..87e702f 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -40,7 +40,7 @@ import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -126,7 +126,7 @@ public interface MesosCallbackHandler {
Driver driver,
Clock clock,
MaintenanceController controller,
- @OffersModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold,
+ @OfferManagerModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold,
@PubsubEventModule.RegisteredEvents EventSink registeredEventSink) {
this(
@@ -226,7 +226,7 @@ public interface MesosCallbackHandler {
storeProvider.getAttributeStore().saveHostAttributes(attributes);
log.info("Received offer: {}", offer.getId().getValue());
offersReceived.incrementAndGet();
- offerManager.addOffer(new HostOffer(offer, attributes));
+ offerManager.add(new HostOffer(offer, attributes));
}
});
});
@@ -244,15 +244,15 @@ public interface MesosCallbackHandler {
// In this scenario, we want to ensure that we do not use it/accept it when the executor
// finally processes the offer. We will temporarily ban it and add a command for the
// executor to unban it so future offers can be processed normally.
- boolean offerCancelled = offerManager.cancelOffer(offerId);
+ boolean offerCancelled = offerManager.cancel(offerId);
if (!offerCancelled) {
log.info(
"Received rescind before adding offer: {}, temporarily banning.",
offerId.getValue());
- offerManager.banOffer(offerId);
+ offerManager.ban(offerId);
executor.execute(() -> {
log.info("Cancelling and unbanning offer: {}.", offerId.getValue());
- offerManager.cancelOffer(offerId);
+ offerManager.cancel(offerId);
});
}
offersRescinded.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
new file mode 100644
index 0000000..8adbcb1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.v1.Protos;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A container for the data structures used by this {@link OfferManagerImpl}, to make it easier to
+ * reason about the different indices used and their consistency.
+ */
+class HostOffers {
+ private final Set<HostOffer> offers;
+
+ private final Map<Protos.OfferID, HostOffer> offersById = Maps.newHashMap();
+ private final Map<Protos.AgentID, HostOffer> offersBySlave = Maps.newHashMap();
+ private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
+
+ // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
+ // scheduling attempts. See VetoGroup for more details on static ban.
+ private final Cache<Pair<Protos.OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers;
+ private final SchedulingFilter schedulingFilter;
+
+ // Keep track of globally banned offers that will never be matched to anything.
+ private final Set<Protos.OfferID> globallyBannedOffers = Sets.newHashSet();
+
+ // Keep track of the number of offers evaluated for vetoes when getting matching offers
+ private final AtomicLong vetoEvaluatedOffers;
+
+ HostOffers(StatsProvider statsProvider,
+ OfferSettings offerSettings,
+ SchedulingFilter schedulingFilter) {
+ this.offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering());
+ this.staticallyBannedOffers = offerSettings
+ .getStaticBanCacheBuilder()
+ .build();
+ this.schedulingFilter = requireNonNull(schedulingFilter);
+
+ // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
+ // Could track this separately if it turns out to pose problems.
+ statsProvider.exportSize(OfferManagerImpl.OUTSTANDING_OFFERS, offers);
+ statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS,
+ staticallyBannedOffers::size);
+ statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS_HIT_RATE,
+ () -> staticallyBannedOffers.stats().hitRate());
+ statsProvider.makeGauge(OfferManagerImpl.GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size);
+
+ vetoEvaluatedOffers = statsProvider.makeCounter(OfferManagerImpl.VETO_EVALUATED_OFFERS);
+ }
+
+ /**
+ * Adds an offer while maintaining a guarantee that no two offers may exist with the same
+ * agent ID. If an offer exists with the same agent ID, the existing offer is removed
+ * and returned, and {@code offer} is not added.
+ *
+ * @param offer Offer to add.
+ * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
+ * which will also be removed prior to returning.
+ */
+ synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
+ HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
+ if (sameAgent != null) {
+ remove(sameAgent.getOffer().getId());
+ return Optional.of(sameAgent);
+ }
+
+ addInternal(offer);
+ return Optional.absent();
+ }
+
+ private void addInternal(HostOffer offer) {
+ offers.add(offer);
+ offersById.put(offer.getOffer().getId(), offer);
+ offersBySlave.put(offer.getOffer().getAgentId(), offer);
+ offersByHost.put(offer.getOffer().getHostname(), offer);
+ }
+
+ synchronized boolean remove(Protos.OfferID id) {
+ HostOffer removed = offersById.remove(id);
+ if (removed != null) {
+ offers.remove(removed);
+ offersBySlave.remove(removed.getOffer().getAgentId());
+ offersByHost.remove(removed.getOffer().getHostname());
+ }
+ globallyBannedOffers.remove(id);
+ return removed != null;
+ }
+
+ synchronized void addGlobalBan(Protos.OfferID offerId) {
+ globallyBannedOffers.add(offerId);
+ }
+
+ synchronized void updateHostAttributes(IHostAttributes attributes) {
+ HostOffer offer = offersByHost.remove(attributes.getHost());
+ if (offer != null) {
+ // Remove and re-add a host's offer to re-sort based on its new hostStatus
+ remove(offer.getOffer().getId());
+ addInternal(new HostOffer(offer.getOffer(), attributes));
+ }
+ }
+
+ synchronized Optional<HostOffer> get(Protos.AgentID slaveId) {
+ HostOffer offer = offersBySlave.get(slaveId);
+ if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) {
+ return Optional.absent();
+ }
+
+ return Optional.of(offer);
+ }
+
+ /**
+ * Returns an iterable giving the state of the offers at the time the method is called. Unlike
+ * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and
+ * will not be modified outside of the returned iterable.
+ *
+ * @return The offers currently known by the scheduler.
+ */
+ synchronized Iterable<HostOffer> getOffers() {
+ return FluentIterable.from(offers)
+ .filter(o -> !globallyBannedOffers.contains(o.getOffer().getId()))
+ .toSet();
+ }
+
+ synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ Optional<HostOffer> optionalOffer = get(slaveId);
+ if (optionalOffer.isPresent()) {
+ HostOffer offer = optionalOffer.get();
+
+ if (isGloballyBanned(offer)
+ || isVetoed(offer, resourceRequest, revocable, Optional.absent())) {
+
+ return Optional.absent();
+ }
+ }
+
+ return optionalOffer;
+ }
+
+ /**
+ * Returns a weakly-consistent iterable giving the available offers to a given
+ * {@code groupKey}. This iterable can handle concurrent operations on its underlying
+ * collection, and may reflect changes that happen after the construction of the iterable.
+ * This property is mainly used in {@code launchTask}.
+ *
+ * @param groupKey The task group to get offers for.
+ * @return The offers a given task group can use.
+ */
+ synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return Iterables.unmodifiableIterable(FluentIterable.from(offers)
+ .filter(o -> !isGloballyBanned(o))
+ .filter(o -> !isStaticallyBanned(o, groupKey))
+ .filter(HostOffer::hasCpuAndMem)
+ .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey))));
+ }
+
+ private synchronized boolean isGloballyBanned(HostOffer offer) {
+ return globallyBannedOffers.contains(offer.getOffer().getId());
+ }
+
+ private synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
+ return staticallyBannedOffers.getIfPresent(Pair.of(offer.getOffer().getId(), groupKey)) != null;
+ }
+
+ /**
+ * Determine whether or not the {@link HostOffer} is vetoed for the given {@link ResourceRequest}.
+ * If {@code groupKey} is present, this method will also temporarily ban the offer from ever
+ * matching the {@link TaskGroupKey}.
+ */
+ private boolean isVetoed(HostOffer offer,
+ ResourceRequest resourceRequest,
+ boolean revocable,
+ Optional<TaskGroupKey> groupKey) {
+
+ vetoEvaluatedOffers.incrementAndGet();
+ UnusedResource unusedResource = new UnusedResource(offer, revocable);
+ Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest);
+ if (!vetoes.isEmpty()) {
+ if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) {
+ addStaticGroupBan(offer.getOffer().getId(), groupKey.get());
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @VisibleForTesting
+ synchronized void addStaticGroupBan(Protos.OfferID offerId, TaskGroupKey groupKey) {
+ if (offersById.containsKey(offerId)) {
+ staticallyBannedOffers.put(Pair.of(offerId, groupKey), true);
+ }
+ }
+
+ @VisibleForTesting
+ synchronized Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() {
+ return staticallyBannedOffers.asMap().keySet();
+ }
+
+ synchronized void clear() {
+ offers.clear();
+ offersById.clear();
+ offersBySlave.clear();
+ offersByHost.clear();
+ staticallyBannedOffers.invalidateAll();
+ globallyBannedOffers.clear();
+ }
+
+ @VisibleForTesting
+ synchronized void cleanUpStaticallyBannedOffers() {
+ staticallyBannedOffers.cleanUp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 96b0f46..0349215 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -13,41 +13,16 @@
*/
package org.apache.aurora.scheduler.offers;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.cache.Cache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.Subscribe;
-import org.apache.aurora.common.collections.Pair;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.Offer.Operation;
import org.apache.mesos.v1.Protos.OfferID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
@@ -61,7 +36,7 @@ public interface OfferManager extends EventSubscriber {
*
* @param offer Newly-available resource offer.
*/
- void addOffer(HostOffer offer);
+ void add(HostOffer offer);
/**
* Invalidates an offer. This indicates that the scheduler should not attempt to match any
@@ -70,62 +45,70 @@ public interface OfferManager extends EventSubscriber {
* @param offerId Cancelled offer.
* @return A boolean on whether or not the offer was successfully cancelled.
*/
- boolean cancelOffer(OfferID offerId);
+ boolean cancel(OfferID offerId);
/**
* Exclude an offer from being matched against all tasks.
*
* @param offerId Offer ID to ban.
*/
- void banOffer(OfferID offerId);
+ void ban(OfferID offerId);
/**
- * Exclude an offer that results in a static mismatch from further attempts to match against all
- * tasks from the same group.
+ * Notifies the offer queue that a host's attributes have changed.
*
- * @param offerId Offer ID to exclude for the given {@code groupKey}.
- * @param groupKey Task group key to exclude.
+ * @param change State change notification.
*/
- void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey);
+ void hostAttributesChanged(HostAttributesChanged change);
/**
- * Launches the task matched against the offer.
+ * Gets the offer for the given slave ID.
*
- * @param offerId Matched offer ID.
- * @param task Matched task info.
- * @throws LaunchException If there was an error launching the task.
+ * @param slaveId Slave ID to get the offer for.
+ * @return The offer for the slave ID.
*/
- void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
+ Optional<HostOffer> get(AgentID slaveId);
/**
- * Notifies the offer queue that a host's attributes have changed.
+ * Gets all offers that the scheduler is holding, excluding banned offers.
*
- * @param change State change notification.
+ * @return A snapshot of the offers that the scheduler is currently holding.
*/
- void hostAttributesChanged(HostAttributesChanged change);
+ Iterable<HostOffer> getAll();
/**
- * Gets the offers that the scheduler is holding, excluding banned offers.
+ * Gets the offer for the given slave ID if satisfies the supplied {@link ResourceRequest}.
*
- * @return A snapshot of the offers that the scheduler is currently holding.
+ * @param slaveId Slave ID to get the offer for.
+ * @param resourceRequest The request that the offer should satisfy.
+ * @param revocable Whether or not the request can use revocable resources.
+ * @return An option containing the offer for the slave ID if it fits.
*/
- Iterable<HostOffer> getOffers();
+ Optional<HostOffer> getMatching(AgentID slaveId,
+ ResourceRequest resourceRequest,
+ boolean revocable);
/**
- * Gets all offers that are not banned for the given {@code groupKey}.
+ * Gets all offers that the scheduler is holding that satisfy the supplied
+ * {@link ResourceRequest}.
*
- * @param groupKey Task group key to check offers for.
- * @return A snapshot of all offers eligible for the given {@code groupKey}.
+ * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}.
+ * @param resourceRequest The request that the offer should satisfy.
+ * @param revocable Whether or not the request can use revocable resources.
+ * @return An option containing the offer for the slave ID if it fits.
*/
- Iterable<HostOffer> getOffers(TaskGroupKey groupKey);
+ Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+ ResourceRequest resourceRequest,
+ boolean revocable);
/**
- * Gets an offer for the given slave ID.
+ * Launches the task matched against the offer.
*
- * @param slaveId Slave ID to get offer for.
- * @return An offer for the slave ID.
+ * @param offerId Matched offer ID.
+ * @param task Matched task info.
+ * @throws LaunchException If there was an error launching the task.
*/
- Optional<HostOffer> getOffer(AgentID slaveId);
+ void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
/**
* Thrown when there was an unexpected failure trying to launch a task.
@@ -140,319 +123,4 @@ public interface OfferManager extends EventSubscriber {
super(msg, cause);
}
}
-
- class OfferManagerImpl implements OfferManager {
- @VisibleForTesting
- static final Logger LOG = LoggerFactory.getLogger(OfferManagerImpl.class);
- @VisibleForTesting
- static final String OFFER_ACCEPT_RACES = "offer_accept_races";
- @VisibleForTesting
- static final String OUTSTANDING_OFFERS = "outstanding_offers";
- @VisibleForTesting
- static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size";
- @VisibleForTesting
- static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate";
- @VisibleForTesting
- static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
- @VisibleForTesting
- static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size";
-
- private final HostOffers hostOffers;
- private final AtomicLong offerRaces;
- private final AtomicLong offerCancelFailures;
-
- private final Driver driver;
- private final OfferSettings offerSettings;
- private final Deferment offerDecline;
-
- @Inject
- @VisibleForTesting
- public OfferManagerImpl(
- Driver driver,
- OfferSettings offerSettings,
- StatsProvider statsProvider,
- Deferment offerDecline) {
-
- this.driver = requireNonNull(driver);
- this.offerSettings = requireNonNull(offerSettings);
- this.hostOffers = new HostOffers(statsProvider, offerSettings);
- this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
- this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
- this.offerDecline = requireNonNull(offerDecline);
- }
-
- @Override
- public void addOffer(HostOffer offer) {
- Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
- if (sameAgent.isPresent()) {
- // We have an existing offer for the same agent. We choose to return both offers so that
- // they may be combined into a single offer.
- LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
- + " for compaction.");
- decline(offer.getOffer().getId());
- decline(sameAgent.get().getOffer().getId());
- } else {
- offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
- }
- }
-
- private void removeAndDecline(OfferID id) {
- if (removeFromHostOffers(id)) {
- decline(id);
- }
- }
-
- private void decline(OfferID id) {
- LOG.debug("Declining offer {}", id);
- driver.declineOffer(id, getOfferFilter());
- }
-
- private Protos.Filters getOfferFilter() {
- return Protos.Filters.newBuilder()
- .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS))
- .build();
- }
-
- @Override
- public boolean cancelOffer(final OfferID offerId) {
- boolean success = removeFromHostOffers(offerId);
- if (!success) {
- // This will happen rarely when we race to process this rescind against accepting the offer
- // to launch a task.
- // If it happens frequently, we are likely processing rescinds before the offer itself.
- LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
- this.offerCancelFailures.incrementAndGet();
- }
- return success;
- }
-
- @Override
- public void banOffer(OfferID offerId) {
- hostOffers.addGlobalBan(offerId);
- }
-
- private boolean removeFromHostOffers(final OfferID offerId) {
- requireNonNull(offerId);
-
- // The small risk of inconsistency is acceptable here - if we have an accept/remove race
- // on an offer, the master will mark the task as LOST and it will be retried.
- return hostOffers.remove(offerId);
- }
-
- @Override
- public Iterable<HostOffer> getOffers() {
- return hostOffers.getOffers();
- }
-
- @Override
- public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
- return hostOffers.getWeaklyConsistentOffers(groupKey);
- }
-
- @Override
- public Optional<HostOffer> getOffer(AgentID slaveId) {
- return hostOffers.get(slaveId);
- }
-
- /**
- * Updates the preference of a host's offers.
- *
- * @param change Host change notification.
- */
- @Subscribe
- public void hostAttributesChanged(HostAttributesChanged change) {
- hostOffers.updateHostAttributes(change.getAttributes());
- }
-
- /**
- * Notifies the queue that the driver is disconnected, and all the stored offers are now
- * invalid.
- * <p>
- * The queue takes this as a signal to flush its queue.
- *
- * @param event Disconnected event.
- */
- @Subscribe
- public void driverDisconnected(DriverDisconnected event) {
- LOG.info("Clearing stale offers since the driver is disconnected.");
- hostOffers.clear();
- }
-
- /**
- * Used for testing to ensure that the underlying cache's `size` method returns an accurate
- * value by not including evicted entries.
- */
- @VisibleForTesting
- public void cleanupStaticBans() {
- hostOffers.staticallyBannedOffers.cleanUp();
- }
-
- /**
- * A container for the data structures used by this class, to make it easier to reason about
- * the different indices used and their consistency.
- */
- private static class HostOffers {
-
- private final Set<HostOffer> offers;
- private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
- private final Map<AgentID, HostOffer> offersBySlave = Maps.newHashMap();
- private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
-
- // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
- // scheduling attempts. See VetoGroup for more details on static ban.
- private final Cache<Pair<OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers;
-
- // Keep track of globally banned offers that will never be matched to anything.
- private final Set<OfferID> globallyBannedOffers = Sets.newConcurrentHashSet();
-
- HostOffers(StatsProvider statsProvider, OfferSettings offerSettings) {
- offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering());
- staticallyBannedOffers = offerSettings
- .getStaticBanCacheBuilder()
- .build();
- // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
- // Could track this separately if it turns out to pose problems.
- statsProvider.exportSize(OUTSTANDING_OFFERS, offers);
- statsProvider.makeGauge(STATICALLY_BANNED_OFFERS, staticallyBannedOffers::size);
- statsProvider.makeGauge(STATICALLY_BANNED_OFFERS_HIT_RATE,
- () -> staticallyBannedOffers.stats().hitRate());
- statsProvider.makeGauge(GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size);
- }
-
- synchronized Optional<HostOffer> get(AgentID slaveId) {
- HostOffer offer = offersBySlave.get(slaveId);
- if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) {
- return Optional.absent();
- }
-
- return Optional.of(offer);
- }
-
- /**
- * Adds an offer while maintaining a guarantee that no two offers may exist with the same
- * agent ID. If an offer exists with the same agent ID, the existing offer is removed
- * and returned, and {@code offer} is not added.
- *
- * @param offer Offer to add.
- * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
- * which will also be removed prior to returning.
- */
- synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
- HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
- if (sameAgent != null) {
- remove(sameAgent.getOffer().getId());
- return Optional.of(sameAgent);
- }
-
- addInternal(offer);
- return Optional.absent();
- }
-
- private void addInternal(HostOffer offer) {
- offers.add(offer);
- offersById.put(offer.getOffer().getId(), offer);
- offersBySlave.put(offer.getOffer().getAgentId(), offer);
- offersByHost.put(offer.getOffer().getHostname(), offer);
- }
-
- synchronized boolean remove(OfferID id) {
- HostOffer removed = offersById.remove(id);
- if (removed != null) {
- offers.remove(removed);
- offersBySlave.remove(removed.getOffer().getAgentId());
- offersByHost.remove(removed.getOffer().getHostname());
- }
- globallyBannedOffers.remove(id);
- return removed != null;
- }
-
- synchronized void updateHostAttributes(IHostAttributes attributes) {
- HostOffer offer = offersByHost.remove(attributes.getHost());
- if (offer != null) {
- // Remove and re-add a host's offer to re-sort based on its new hostStatus
- remove(offer.getOffer().getId());
- addInternal(new HostOffer(offer.getOffer(), attributes));
- }
- }
-
- /**
- * Returns an iterable giving the state of the offers at the time the method is called. Unlike
- * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and
- * will not be modified outside of the returned iterable.
- *
- * @return The offers currently known by the scheduler.
- */
- synchronized Iterable<HostOffer> getOffers() {
- return FluentIterable.from(offers).filter(
- e -> !globallyBannedOffers.contains(e.getOffer().getId())
- ).toSet();
- }
-
- /**
- * Returns a weakly-consistent iterable giving the available offers to a given
- * {@code groupKey}. This iterable can handle concurrent operations on its underlying
- * collection, and may reflect changes that happen after the construction of the iterable.
- * This property is mainly used in {@code launchTask}.
- *
- * @param groupKey The task group to get offers for.
- * @return The offers a given task group can use.
- */
- synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) {
- return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(e ->
- staticallyBannedOffers.getIfPresent(Pair.of(e.getOffer().getId(), groupKey)) == null
- && !globallyBannedOffers.contains(e.getOffer().getId())));
- }
-
- synchronized void addGlobalBan(OfferID offerId) {
- globallyBannedOffers.add(offerId);
- }
-
- synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) {
- if (offersById.containsKey(offerId)) {
- staticallyBannedOffers.put(Pair.of(offerId, groupKey), true);
- }
- }
-
- synchronized void clear() {
- offers.clear();
- offersById.clear();
- offersBySlave.clear();
- offersByHost.clear();
- staticallyBannedOffers.invalidateAll();
- globallyBannedOffers.clear();
- }
- }
-
- @Override
- public void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey) {
- hostOffers.addStaticGroupBan(offerId, groupKey);
- }
-
- @Timed("offer_manager_launch_task")
- @Override
- public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException {
- // Guard against an offer being removed after we grabbed it from the iterator.
- // If that happens, the offer will not exist in hostOffers, and we can immediately
- // send it back to LOST for quick reschedule.
- // Removing while iterating counts on the use of a weakly-consistent iterator being used,
- // which is a feature of ConcurrentSkipListSet.
- if (hostOffers.remove(offerId)) {
- try {
- Operation launch = Operation.newBuilder()
- .setType(Operation.Type.LAUNCH)
- .setLaunch(Operation.Launch.newBuilder().addTaskInfos(task))
- .build();
- driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter());
- } catch (IllegalStateException e) {
- // TODO(William Farner): Catch only the checked exception produced by Driver
- // once it changes from throwing IllegalStateException when the driver is not yet
- // registered.
- throw new LaunchException("Failed to launch task.", e);
- }
- } else {
- offerRaces.incrementAndGet();
- throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
new file mode 100644
index 0000000..427b1b4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.mesos.v1.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
+
+public class OfferManagerImpl implements OfferManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(org.apache.aurora.scheduler.offers.OfferManagerImpl.class);
+
+ @VisibleForTesting
+ static final String OFFER_ACCEPT_RACES = "offer_accept_races";
+ @VisibleForTesting
+ static final String OUTSTANDING_OFFERS = "outstanding_offers";
+ @VisibleForTesting
+ static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size";
+ @VisibleForTesting
+ static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate";
+ @VisibleForTesting
+ static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
+ @VisibleForTesting
+ static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size";
+ @VisibleForTesting
+ static final String VETO_EVALUATED_OFFERS = "veto_evaluated_offers";
+
+ private final HostOffers hostOffers;
+ private final AtomicLong offerRaces;
+ private final AtomicLong offerCancelFailures;
+
+ private final Driver driver;
+ private final OfferSettings offerSettings;
+ private final Deferment offerDecline;
+
+ @Inject
+ @VisibleForTesting
+ public OfferManagerImpl(
+ Driver driver,
+ OfferSettings offerSettings,
+ StatsProvider statsProvider,
+ Deferment offerDecline,
+ SchedulingFilter schedulingFilter) {
+
+ this.driver = requireNonNull(driver);
+ this.offerSettings = requireNonNull(offerSettings);
+ this.hostOffers = new HostOffers(statsProvider, offerSettings, schedulingFilter);
+ this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
+ this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
+ this.offerDecline = requireNonNull(offerDecline);
+ }
+
+ @Override
+ public void add(HostOffer offer) {
+ Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
+ if (sameAgent.isPresent()) {
+ // We have an existing offer for the same agent. We choose to return both offers so that
+ // they may be combined into a single offer.
+ LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
+ + " for compaction.");
+ decline(offer.getOffer().getId());
+ decline(sameAgent.get().getOffer().getId());
+ } else {
+ offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
+ }
+ }
+
+ private void removeAndDecline(Protos.OfferID id) {
+ if (removeFromHostOffers(id)) {
+ decline(id);
+ }
+ }
+
+ private void decline(Protos.OfferID id) {
+ LOG.debug("Declining offer {}", id);
+ driver.declineOffer(id, getOfferFilter());
+ }
+
+ private Protos.Filters getOfferFilter() {
+ return Protos.Filters.newBuilder()
+ .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS))
+ .build();
+ }
+
+ @Override
+ public boolean cancel(final Protos.OfferID offerId) {
+ boolean success = removeFromHostOffers(offerId);
+ if (!success) {
+ // This will happen rarely when we race to process this rescind against accepting the offer
+ // to launch a task.
+ // If it happens frequently, we are likely processing rescinds before the offer itself.
+ LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
+ this.offerCancelFailures.incrementAndGet();
+ }
+ return success;
+ }
+
+ private boolean removeFromHostOffers(final Protos.OfferID offerId) {
+ requireNonNull(offerId);
+
+ // The small risk of inconsistency is acceptable here - if we have an accept/remove race
+ // on an offer, the master will mark the task as LOST and it will be retried.
+ return hostOffers.remove(offerId);
+ }
+
+ @Override
+ public void ban(Protos.OfferID offerId) {
+ hostOffers.addGlobalBan(offerId);
+ }
+
+ /**
+ * Updates the preference of a host's offers.
+ *
+ * @param change Host change notification.
+ */
+ @Subscribe
+ public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
+ hostOffers.updateHostAttributes(change.getAttributes());
+ }
+
+ @Override
+ public Optional<HostOffer> get(Protos.AgentID slaveId) {
+ return hostOffers.get(slaveId);
+ }
+
+ @Override
+ public Iterable<HostOffer> getAll() {
+ return hostOffers.getOffers();
+ }
+
+ @Override
+ public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return hostOffers.getMatching(slaveId, resourceRequest, revocable);
+ }
+
+ @Override
+ public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return hostOffers.getAllMatching(groupKey, resourceRequest, revocable);
+ }
+
+ /**
+ * Notifies the queue that the driver is disconnected, and all the stored offers are now
+ * invalid.
+ * <p>
+ * The queue takes this as a signal to flush its queue.
+ *
+ * @param event Disconnected event.
+ */
+ @Subscribe
+ public void driverDisconnected(PubsubEvent.DriverDisconnected event) {
+ LOG.info("Clearing stale offers since the driver is disconnected.");
+ hostOffers.clear();
+ }
+
+ @Timed("offer_manager_launch_task")
+ @Override
+ public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) throws LaunchException {
+ // Guard against an offer being removed after we grabbed it from the iterator.
+ // If that happens, the offer will not exist in hostOffers, and we can immediately
+ // send it back to LOST for quick reschedule.
+ // Removing while iterating counts on the use of a weakly-consistent iterator being used,
+ // which is a feature of ConcurrentSkipListSet.
+ if (hostOffers.remove(offerId)) {
+ try {
+ Protos.Offer.Operation launch = Protos.Offer.Operation.newBuilder()
+ .setType(Protos.Offer.Operation.Type.LAUNCH)
+ .setLaunch(Protos.Offer.Operation.Launch.newBuilder().addTaskInfos(task))
+ .build();
+ driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter());
+ } catch (IllegalStateException e) {
+ // TODO(William Farner): Catch only the checked exception produced by Driver
+ // once it changes from throwing IllegalStateException when the driver is not yet
+ // registered.
+ throw new LaunchException("Failed to launch task.", e);
+ }
+ } else {
+ offerRaces.incrementAndGet();
+ throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
+ }
+ }
+
+ /**
+ * Get all static bans.
+ */
+ @VisibleForTesting
+ Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() {
+ return hostOffers.getStaticBans();
+ }
+
+ /**
+ * Exclude an offer that results in a static mismatch from further attempts to match against all
+ * tasks from the same group.
+ */
+ @VisibleForTesting
+ void banForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) {
+ hostOffers.addStaticGroupBan(offerId, groupKey);
+ }
+
+ /**
+ * Used for testing to ensure that the underlying cache's `size` method returns an accurate
+ * value by not including evicted entries.
+ */
+ @VisibleForTesting
+ void cleanupStaticBans() {
+ hostOffers.cleanUpStaticallyBannedOffers();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
new file mode 100644
index 0000000..e2e3628
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+
+import javax.inject.Qualifier;
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Supplier;
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Random;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.app.MoreModules;
+import org.apache.aurora.scheduler.config.CliOptions;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
+import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Binding module for resource offer management.
+ */
+public class OfferManagerModule extends AbstractModule {
+ private static final Logger LOG = LoggerFactory.getLogger(OfferManagerModule.class);
+
+ @Parameters(separators = "=")
+ public static class Options {
+ @Parameter(names = "-hold_offers_forever",
+ description =
+ "Hold resource offers indefinitely, disabling automatic offer decline settings.",
+ arity = 1)
+ public boolean holdOffersForever = false;
+
+ @Parameter(names = "-min_offer_hold_time",
+ validateValueWith = NotNegativeAmount.class,
+ description = "Minimum amount of time to hold a resource offer before declining.")
+ public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
+
+ @Parameter(names = "-offer_hold_jitter_window",
+ validateValueWith = NotNegativeAmount.class,
+ description = "Maximum amount of random jitter to add to the offer hold time window.")
+ public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
+
+ @Parameter(names = "-offer_filter_duration",
+ description =
+ "Duration after which we expect Mesos to re-offer unused resources. A short duration "
+ + "improves scheduling performance in smaller clusters, but might lead to resource "
+ + "starvation for other frameworks if you run many frameworks in your cluster.")
+ public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
+
+ @Parameter(names = "-unavailability_threshold",
+ description =
+ "Threshold time, when running tasks should be drained from a host, before a host "
+ + "becomes unavailable. Should be greater than min_offer_hold_time + "
+ + "offer_hold_jitter_window.")
+ public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
+
+ @Parameter(names = "-offer_order",
+ description =
+ "Iteration order for offers, to influence task scheduling. Multiple orderings will be "
+ + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
+ + " then memory and finally would randomize any equal offers.")
+ public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
+
+ @Parameter(names = "-offer_order_modules",
+ description = "Custom Guice module to provide an offer ordering.")
+ @SuppressWarnings("rawtypes")
+ public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class);
+
+ @Parameter(names = "-offer_static_ban_cache_max_size",
+ validateValueWith = NotNegativeNumber.class,
+ description =
+ "The number of offers to hold in the static ban cache. If no value is specified, "
+ + "the cache will grow indefinitely. However, entries will expire within "
+ + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
+ public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
+ }
+
+ /**
+ * Binding annotation for the threshold to veto tasks with unavailability.
+ */
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ public @interface UnavailabilityThreshold { }
+
+ public static class OfferOrderModule extends AbstractModule {
+ private final CliOptions options;
+
+ public OfferOrderModule(CliOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ protected void configure() {
+ bind(new TypeLiteral<Ordering<HostOffer>>() { })
+ .toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
+ }
+ }
+
+ private final CliOptions cliOptions;
+
+ public OfferManagerModule(CliOptions cliOptions) {
+ this.cliOptions = cliOptions;
+ }
+
+ @Override
+ protected void configure() {
+ Options options = cliOptions.offer;
+ if (!options.holdOffersForever) {
+ long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
+ + options.minOfferHoldTime.as(Time.SECONDS);
+ if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
+ LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
+ + " and offer_hold_jitter_window ({}). This creates risks of races between "
+ + "launching and draining",
+ options.unavailabilityThreshold,
+ options.minOfferHoldTime,
+ options.offerHoldJitterWindow);
+ }
+ }
+
+ for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
+ install(module);
+ }
+
+ bind(new TypeLiteral<Amount<Long, Time>>() { })
+ .annotatedWith(UnavailabilityThreshold.class)
+ .toInstance(options.unavailabilityThreshold);
+
+ install(new PrivateModule() {
+ @Override
+ protected void configure() {
+ if (options.holdOffersForever) {
+ bind(Deferment.class).to(Deferment.Noop.class);
+ } else {
+ bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
+ new RandomJitterReturnDelay(
+ options.minOfferHoldTime.as(Time.MILLISECONDS),
+ options.offerHoldJitterWindow.as(Time.MILLISECONDS),
+ Random.Util.newDefaultRandom()));
+ bind(Deferment.class).to(Deferment.DelayedDeferment.class);
+ }
+
+ bind(OfferManager.class).to(OfferManagerImpl.class);
+ bind(OfferManagerImpl.class).in(Singleton.class);
+ expose(OfferManager.class);
+ }
+ });
+ PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
+ }
+
+ @Provides
+ @Singleton
+ OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
+ // We have a dual eviction strategy for the static ban cache in OfferManager that is based on
+ // both maximum size of the cache and the length an offer is valid. We do this in order to
+ // satisfy requirements in both single- and multi-framework environments. If offers are held for
+ // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
+ // longest it will be valid for. Additionally, cluster operators will most likely not have to
+ // worry about cache size in this case as this behavior mimics current behavior. If offers are
+ // held indefinitely, then we never expire cache entries but the cluster operator can specify a
+ // maximum size to avoid a memory leak.
+ long maxOfferHoldTime;
+ if (cliOptions.offer.holdOffersForever) {
+ maxOfferHoldTime = Long.MAX_VALUE;
+ } else {
+ maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
+ + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
+ }
+
+ return new OfferSettings(
+ cliOptions.offer.offerFilterDuration,
+ offerOrdering,
+ Amount.of(maxOfferHoldTime, Time.SECONDS),
+ cliOptions.offer.offerStaticBanCacheMaxSize,
+ Ticker.systemTicker());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
index 57fc1a1..838a319 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
@@ -29,6 +29,7 @@ import static java.util.Objects.requireNonNull;
/**
* Settings required to create an OfferManager.
*/
+@VisibleForTesting
public class OfferSettings {
private final Amount<Long, Time> filterDuration;
@@ -67,14 +68,14 @@ public class OfferSettings {
/**
* Duration after which we want Mesos to re-offer unused or declined resources.
*/
- public Amount<Long, Time> getFilterDuration() {
+ Amount<Long, Time> getFilterDuration() {
return filterDuration;
}
/**
* The ordering to use when fetching offers from OfferManager.
*/
- public Ordering<HostOffer> getOrdering() {
+ Ordering<HostOffer> getOrdering() {
return ordering;
}
@@ -82,7 +83,7 @@ public class OfferSettings {
* The builder for the static ban cache. Cache settings (e.g. max size, entry expiration) should
* already be added to the builder by this point.
*/
- public CacheBuilder<Object, Object> getStaticBanCacheBuilder() {
+ CacheBuilder<Object, Object> getStaticBanCacheBuilder() {
return staticBanCacheBuilder;
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
deleted file mode 100644
index 4a6ea8d..0000000
--- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.offers;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.List;
-
-import javax.inject.Qualifier;
-import javax.inject.Singleton;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.base.Supplier;
-import com.google.common.base.Ticker;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Ordering;
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.util.Random;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.app.MoreModules;
-import org.apache.aurora.scheduler.config.CliOptions;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
-import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Binding module for resource offer management.
- */
-public class OffersModule extends AbstractModule {
- private static final Logger LOG = LoggerFactory.getLogger(OffersModule.class);
-
- @Parameters(separators = "=")
- public static class Options {
- @Parameter(names = "-hold_offers_forever",
- description =
- "Hold resource offers indefinitely, disabling automatic offer decline settings.",
- arity = 1)
- public boolean holdOffersForever = false;
-
- @Parameter(names = "-min_offer_hold_time",
- validateValueWith = NotNegativeAmount.class,
- description = "Minimum amount of time to hold a resource offer before declining.")
- public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
-
- @Parameter(names = "-offer_hold_jitter_window",
- validateValueWith = NotNegativeAmount.class,
- description = "Maximum amount of random jitter to add to the offer hold time window.")
- public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
-
- @Parameter(names = "-offer_filter_duration",
- description =
- "Duration after which we expect Mesos to re-offer unused resources. A short duration "
- + "improves scheduling performance in smaller clusters, but might lead to resource "
- + "starvation for other frameworks if you run many frameworks in your cluster.")
- public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
-
- @Parameter(names = "-unavailability_threshold",
- description =
- "Threshold time, when running tasks should be drained from a host, before a host "
- + "becomes unavailable. Should be greater than min_offer_hold_time + "
- + "offer_hold_jitter_window.")
- public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
-
- @Parameter(names = "-offer_order",
- description =
- "Iteration order for offers, to influence task scheduling. Multiple orderings will be "
- + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
- + " then memory and finally would randomize any equal offers.")
- public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
-
- @Parameter(names = "-offer_order_modules",
- description = "Custom Guice module to provide an offer ordering.")
- @SuppressWarnings("rawtypes")
- public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class);
-
- @Parameter(names = "-offer_static_ban_cache_max_size",
- validateValueWith = NotNegativeNumber.class,
- description =
- "The number of offers to hold in the static ban cache. If no value is specified, "
- + "the cache will grow indefinitely. However, entries will expire within "
- + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
- public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
- }
-
- public static class OfferOrderModule extends AbstractModule {
- private final CliOptions options;
-
- public OfferOrderModule(CliOptions options) {
- this.options = options;
- }
-
- @Override
- protected void configure() {
- bind(new TypeLiteral<Ordering<HostOffer>>() { })
- .toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
- }
- }
-
- /**
- * Binding annotation for the threshold to veto tasks with unavailability.
- */
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- public @interface UnavailabilityThreshold { }
-
- private final CliOptions cliOptions;
-
- public OffersModule(CliOptions cliOptions) {
- this.cliOptions = cliOptions;
- }
-
- @Override
- protected void configure() {
- Options options = cliOptions.offer;
- if (!options.holdOffersForever) {
- long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
- + options.minOfferHoldTime.as(Time.SECONDS);
- if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
- LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
- + " and offer_hold_jitter_window ({}). This creates risks of races between "
- + "launching and draining",
- options.unavailabilityThreshold,
- options.minOfferHoldTime,
- options.offerHoldJitterWindow);
- }
- }
-
- for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
- install(module);
- }
-
- bind(new TypeLiteral<Amount<Long, Time>>() { })
- .annotatedWith(UnavailabilityThreshold.class)
- .toInstance(options.unavailabilityThreshold);
-
- install(new PrivateModule() {
- @Override
- protected void configure() {
- if (options.holdOffersForever) {
- bind(Deferment.class).to(Deferment.Noop.class);
- } else {
- bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
- new RandomJitterReturnDelay(
- options.minOfferHoldTime.as(Time.MILLISECONDS),
- options.offerHoldJitterWindow.as(Time.MILLISECONDS),
- Random.Util.newDefaultRandom()));
- bind(Deferment.class).to(Deferment.DelayedDeferment.class);
- }
-
- bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
- bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
- expose(OfferManager.class);
- }
- });
- PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
- }
-
- @Provides
- @Singleton
- OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
- // We have a dual eviction strategy for the static ban cache in OfferManager that is based on
- // both maximum size of the cache and the length an offer is valid. We do this in order to
- // satisfy requirements in both single- and multi-framework environments. If offers are held for
- // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
- // longest it will be valid for. Additionally, cluster operators will most likely not have to
- // worry about cache size in this case as this behavior mimics current behavior. If offers are
- // held indefinitely, then we never expire cache entries but the cluster operator can specify a
- // maximum size to avoid a memory leak.
- long maxOfferHoldTime;
- if (cliOptions.offer.holdOffersForever) {
- maxOfferHoldTime = Long.MAX_VALUE;
- } else {
- maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
- + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
- }
-
- return new OfferSettings(
- cliOptions.offer.offerFilterDuration,
- offerOrdering,
- Amount.of(maxOfferHoldTime, Time.SECONDS),
- cliOptions.offer.offerStaticBanCacheMaxSize,
- Ticker.systemTicker());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
index 766d3b2..497a766 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
@@ -143,7 +143,7 @@ public class PendingTaskProcessor implements Runnable {
// Group the offers by slave id so they can be paired with active tasks from the same slave.
Map<String, HostOffer> slavesToOffers =
- Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
+ Maps.uniqueIndex(offerManager.getAll(), OFFER_TO_SLAVE_ID);
Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
slavesToOffers.keySet(),
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
index 82a0ff6..ffb8b90 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
@@ -94,7 +94,7 @@ public interface Preemptor {
pendingTask.getTask(),
slot.getVictims(),
jobState,
- offerManager.getOffer(slaveId),
+ offerManager.get(slaveId),
store);
metrics.recordSlotValidationResult(validatedVictims);