You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/10/25 01:12:02 UTC
git commit: Cache the host's maintenance status with offer.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 669981d5f -> 3ad0c5fb1
Cache the host's maintenance status with offer.
Bugs closed: AURORA-878
Reviewed at https://reviews.apache.org/r/27100/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/3ad0c5fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/3ad0c5fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/3ad0c5fb
Branch: refs/heads/master
Commit: 3ad0c5fb1c0a705bd2bc11c7a56218146eccc117
Parents: 669981d
Author: Zameer Manji <zm...@twopensource.com>
Authored: Fri Oct 24 16:11:31 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Oct 24 16:11:31 2014 -0700
----------------------------------------------------------------------
.../aurora/scheduler/async/OfferQueue.java | 95 +++++++-----
.../aurora/scheduler/async/Preemptor.java | 56 +++++--
.../aurora/scheduler/async/TaskScheduler.java | 15 +-
.../events/NotifyingSchedulingFilter.java | 4 +-
.../scheduler/filter/SchedulingFilter.java | 3 +
.../scheduler/filter/SchedulingFilterImpl.java | 17 ++-
.../apache/aurora/scheduler/http/Offers.java | 9 +-
.../aurora/scheduler/state/TaskAssigner.java | 17 ++-
.../scheduler/stats/AsyncStatsModule.java | 15 +-
.../scheduler/async/OfferQueueImplTest.java | 25 ++-
.../scheduler/async/PreemptorImplTest.java | 43 ++++--
.../scheduler/async/TaskSchedulerImplTest.java | 36 ++---
.../scheduler/async/TaskSchedulerTest.java | 99 ++++++------
.../events/NotifyingSchedulingFilterTest.java | 10 +-
.../filter/SchedulingFilterImplTest.java | 153 +++++++++++--------
.../scheduler/state/TaskAssignerImplTest.java | 9 +-
16 files changed, 365 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index 92c8438..1a45d08 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -28,7 +28,6 @@ import javax.inject.Inject;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
-import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@@ -48,6 +47,8 @@ import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskInfo;
+import static java.util.Objects.requireNonNull;
+
import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
import static org.apache.aurora.gen.MaintenanceMode.NONE;
@@ -82,7 +83,7 @@ public interface OfferQueue extends EventSubscriber {
* @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
* task.
*/
- boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor) throws LaunchException;
+ boolean launchFirst(Function<HostOffer, Optional<TaskInfo>> acceptor) throws LaunchException;
/**
* Notifies the offer queue that a host has changed state.
@@ -96,7 +97,7 @@ public interface OfferQueue extends EventSubscriber {
*
* @return A snapshot of the offers that the scheduler is currently holding.
*/
- Iterable<Offer> getOffers();
+ Iterable<HostOffer> getOffers();
/**
* Calculates the amount of time before an offer should be 'returned' by declining it.
@@ -119,6 +120,51 @@ public interface OfferQueue extends EventSubscriber {
}
}
+ /**
+ * Encapsulate an offer from a host, and the host's maintenance mode.
+ */
+ class HostOffer {
+ private final Offer offer;
+
+ // TODO(wfarner): Replace this with HostAttributes for more use of this caching.
+ private final MaintenanceMode mode;
+
+ public HostOffer(Offer offer, MaintenanceMode mode) {
+ this.offer = requireNonNull(offer);
+ this.mode = requireNonNull(mode);
+ }
+
+ public Offer getOffer() {
+ return offer;
+ }
+
+ public MaintenanceMode getMode() {
+ return mode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HostOffer)) {
+ return false;
+ }
+ HostOffer other = (HostOffer) o;
+ return Objects.equals(offer, other.offer) && mode == other.mode;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(offer, mode);
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.Objects.toStringHelper(this)
+ .add("offer", offer)
+ .add("mode", mode)
+ .toString();
+ }
+ }
+
class OfferQueueImpl implements OfferQueue {
private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
@@ -187,7 +233,7 @@ public interface OfferQueue extends EventSubscriber {
}
private boolean removeFromHostOffers(final OfferID offerId) {
- Objects.requireNonNull(offerId);
+ requireNonNull(offerId);
// The small risk of inconsistency is acceptable here - if we have an accept/remove race
// on an offer, the master will mark the task as LOST and it will be retried.
@@ -195,14 +241,8 @@ public interface OfferQueue extends EventSubscriber {
}
@Override
- public Iterable<Offer> getOffers() {
- return FluentIterable.from(hostOffers.getWeaklyConsistentOffers())
- .transform(new Function<HostOffer, Offer>() {
- @Override
- public Offer apply(HostOffer offer) {
- return offer.offer;
- }
- });
+ public Iterable<HostOffer> getOffers() {
+ return hostOffers.getWeaklyConsistentOffers();
}
/**
@@ -230,33 +270,6 @@ public interface OfferQueue extends EventSubscriber {
}
/**
- * Encapsulate an offer from a host, and the host's maintenance mode.
- */
- private static class HostOffer {
- private final Offer offer;
- private final MaintenanceMode mode;
-
- HostOffer(Offer offer, MaintenanceMode mode) {
- this.offer = offer;
- this.mode = mode;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof HostOffer)) {
- return false;
- }
- HostOffer other = (HostOffer) o;
- return Objects.equals(offer, other.offer) && mode == other.mode;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(offer, mode);
- }
- }
-
- /**
* A container for the data structures used by this class, to make it easier to reason about
* the different indices used and their consistency.
*/
@@ -326,14 +339,14 @@ public interface OfferQueue extends EventSubscriber {
}
@Override
- public boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor)
+ public boolean launchFirst(Function<HostOffer, Optional<TaskInfo>> acceptor)
throws LaunchException {
// It's important that this method is not called concurrently - doing so would open up the
// possibility of a race between the same offers being accepted by different threads.
for (HostOffer hostOffer : hostOffers.getWeaklyConsistentOffers()) {
- Optional<TaskInfo> assignment = acceptor.apply(hostOffer.offer);
+ Optional<TaskInfo> assignment = acceptor.apply(hostOffer);
if (assignment.isPresent()) {
// Guard against an offer being removed after we grabbed it from the iterator.
// If that happens, the offer will not exist in hostOffers, and we can immediately
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index e3e261d..a17738e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -42,17 +42,18 @@ import com.twitter.common.quantity.Time;
import com.twitter.common.stats.Stats;
import com.twitter.common.util.Clock;
+import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos.Offer;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
@@ -62,6 +63,7 @@ import static java.util.Objects.requireNonNull;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
/**
@@ -128,6 +130,7 @@ public interface Preemptor {
private final SchedulingFilter schedulingFilter;
private final Amount<Long, Time> preemptionCandidacyDelay;
private final Clock clock;
+ private final MaintenanceController maintenance;
/**
* Creates a new preemptor.
@@ -147,7 +150,8 @@ public interface Preemptor {
OfferQueue offerQueue,
SchedulingFilter schedulingFilter,
@PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
- Clock clock) {
+ Clock clock,
+ MaintenanceController maintenance) {
this.storage = requireNonNull(storage);
this.stateManager = requireNonNull(stateManager);
@@ -155,6 +159,7 @@ public interface Preemptor {
this.schedulingFilter = requireNonNull(schedulingFilter);
this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
this.clock = requireNonNull(clock);
+ this.maintenance = requireNonNull(maintenance);
}
private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
@@ -192,19 +197,27 @@ public interface Preemptor {
}
};
- private static final Function<Offer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
- new Function<Offer, ResourceSlot>() {
+ private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
+ new Function<HostOffer, ResourceSlot>() {
@Override
- public ResourceSlot apply(Offer offer) {
- return ResourceSlot.from(offer);
+ public ResourceSlot apply(HostOffer hostOffer) {
+ return ResourceSlot.from(hostOffer.getOffer());
}
};
- private static final Function<Offer, String> OFFER_TO_HOST =
- new Function<Offer, String>() {
+ private static final Function<HostOffer, String> OFFER_TO_HOST =
+ new Function<HostOffer, String>() {
@Override
- public String apply(Offer offer) {
- return offer.getHostname();
+ public String apply(HostOffer hostOffer) {
+ return hostOffer.getOffer().getHostname();
+ }
+ };
+
+ private static final Function<HostOffer, MaintenanceMode> OFFER_TO_MODE =
+ new Function<HostOffer, MaintenanceMode>() {
+ @Override
+ public MaintenanceMode apply(HostOffer hostOffer) {
+ return hostOffer.getMode();
}
};
@@ -220,7 +233,7 @@ public interface Preemptor {
*/
private Optional<Set<IAssignedTask>> getTasksToPreempt(
Iterable<IAssignedTask> possibleVictims,
- Iterable<Offer> offers,
+ Iterable<HostOffer> offers,
IAssignedTask pendingTask,
AttributeAggregate attributeAggregate) {
@@ -236,9 +249,19 @@ public interface Preemptor {
ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
if (!Iterables.isEmpty(offers)) {
+ if (Iterables.size(offers) > 1) {
+ // There are multiple offers for the same host. Since both have maintenance information
+ // we don't preempt with this information and wait for mesos to merge the two offers for
+ // us.
+ return Optional.absent();
+ }
+ MaintenanceMode mode =
+ Iterables.getOnlyElement(FluentIterable.from(offers).transform(OFFER_TO_MODE).toSet());
+
Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
slackResources,
host,
+ mode,
pendingTask.getTask(),
pendingTask.getTaskId(),
attributeAggregate);
@@ -269,6 +292,7 @@ public interface Preemptor {
Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
totalResource,
host,
+ maintenance.getMode(host),
pendingTask.getTask(),
pendingTask.getTaskId(),
attributeAggregate);
@@ -280,11 +304,11 @@ public interface Preemptor {
return Optional.absent();
}
- private static final Function<Offer, String> OFFER_TO_SLAVE_ID =
- new Function<Offer, String>() {
+ private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
+ new Function<HostOffer, String>() {
@Override
- public String apply(Offer offer) {
- return offer.getSlaveId().getValue();
+ public String apply(HostOffer hostOffer) {
+ return hostOffer.getOffer().getSlaveId().getValue();
}
};
@@ -323,7 +347,7 @@ public interface Preemptor {
attemptedPreemptions.incrementAndGet();
// Group the offers by slave id so they can be paired with active tasks from the same slave.
- Multimap<String, Offer> slavesToOffers =
+ Multimap<String, HostOffer> slavesToOffers =
Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
Set<String> allSlaves = ImmutableSet.<String>builder()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 1189ed0..5739c39 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -54,7 +54,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskInfo;
@@ -66,6 +65,7 @@ import static java.util.Objects.requireNonNull;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
/**
* Enables scheduling and preemption of tasks.
@@ -126,26 +126,27 @@ public interface TaskScheduler extends EventSubscriber {
this.reservations = new Reservations(reservationDuration, clock);
}
- private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
+ private Function<HostOffer, Optional<TaskInfo>> getAssignerFunction(
final AttributeAggregate attributeAggregate,
final String taskId,
final IScheduledTask task) {
- return new Function<Offer, Optional<TaskInfo>>() {
+ return new Function<HostOffer, Optional<TaskInfo>>() {
@Override
- public Optional<TaskInfo> apply(Offer offer) {
- Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
+ public Optional<TaskInfo> apply(HostOffer hostOffer) {
+ Optional<String> reservedTaskId =
+ reservations.getSlaveReservation(hostOffer.getOffer().getSlaveId());
if (reservedTaskId.isPresent()) {
if (taskId.equals(reservedTaskId.get())) {
// Slave is reserved to satisfy this task.
- return assigner.maybeAssign(offer, task, attributeAggregate);
+ return assigner.maybeAssign(hostOffer, task, attributeAggregate);
} else {
// Slave is reserved for another task.
return Optional.absent();
}
} else {
// Slave is not reserved.
- return assigner.maybeAssign(offer, task, attributeAggregate);
+ return assigner.maybeAssign(hostOffer, task, attributeAggregate);
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index 283f7e1..fc17cac 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -20,6 +20,7 @@ import java.util.Set;
import javax.inject.Inject;
import javax.inject.Qualifier;
+import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -60,11 +61,12 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
public Set<Veto> filter(
ResourceSlot offer,
String slaveHost,
+ MaintenanceMode mode,
ITaskConfig task,
String taskId,
AttributeAggregate jobState) {
- Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId, jobState);
+ Set<Veto> vetoes = delegate.filter(offer, slaveHost, mode, task, taskId, jobState);
if (!vetoes.isEmpty()) {
eventSink.post(new Vetoed(taskId, vetoes));
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 1e3018e..c37272c 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -18,6 +18,7 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -105,6 +106,7 @@ public interface SchedulingFilter {
*
* @param offer Resources offered.
* @param slaveHost Host that the resources are associated with.
+ * @param mode Maintenance mode of the host that the resources are associated with.
* @param task Task.
* @param taskId Canonical ID of the task.
* @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
@@ -114,6 +116,7 @@ public interface SchedulingFilter {
Set<Veto> filter(
ResourceSlot offer,
String slaveHost,
+ MaintenanceMode mode,
ITaskConfig task,
String taskId,
AttributeAggregate attributeAggregate);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index da29428..0533baa 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -33,7 +33,6 @@ import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -67,18 +66,15 @@ public class SchedulingFilterImpl implements SchedulingFilter {
private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED);
private final Storage storage;
- private final MaintenanceController maintenance;
/**
* Creates a new scheduling filter.
*
* @param storage Interface to accessing the task store.
- * @param maintenance Interface to accessing the maintenance controller
*/
@Inject
- public SchedulingFilterImpl(Storage storage, MaintenanceController maintenance) {
+ public SchedulingFilterImpl(Storage storage) {
this.storage = requireNonNull(storage);
- this.maintenance = requireNonNull(maintenance);
}
/**
@@ -230,8 +226,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
};
}
- private Optional<Veto> getMaintenanceVeto(String slaveHost) {
- MaintenanceMode mode = maintenance.getMode(slaveHost);
+ private Optional<Veto> getMaintenanceVeto(MaintenanceMode mode) {
return VETO_MODES.contains(mode)
? Optional.of(ConstraintFilter.maintenanceVeto(mode.toString().toLowerCase()))
: NO_VETO;
@@ -261,6 +256,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
public Set<Veto> filter(
ResourceSlot offer,
String slaveHost,
+ MaintenanceMode mode,
ITaskConfig task,
String taskId,
AttributeAggregate attributeAggregate) {
@@ -268,10 +264,15 @@ public class SchedulingFilterImpl implements SchedulingFilter {
if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
return ImmutableSet.of(DEDICATED_HOST_VETO);
}
+
+ Optional<Veto> maintenanceVeto = getMaintenanceVeto(mode);
+ if (maintenanceVeto.isPresent()) {
+ return maintenanceVeto.asSet();
+ }
+
return ImmutableSet.<Veto>builder()
.addAll(getConstraintFilter(attributeAggregate, slaveHost).apply(task))
.addAll(getResourceVetoes(offer, task))
- .addAll(getMaintenanceVeto(slaveHost).asSet())
.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index b7dfeda..446dc74 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -34,6 +34,8 @@ import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.Value.Range;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+
/**
* Servlet that exposes resource offers that the scheduler is currently retaining.
*/
@@ -119,10 +121,11 @@ public class Offers {
return FluentIterable.from(iterable).transform(transform).toList();
}
- private static final Function<Offer, Map<String, ?>> TO_BEAN =
- new Function<Offer, Map<String, ?>>() {
+ private static final Function<HostOffer, Map<String, ?>> TO_BEAN =
+ new Function<HostOffer, Map<String, ?>>() {
@Override
- public Map<String, ?> apply(Offer offer) {
+ public Map<String, ?> apply(HostOffer hostOffer) {
+ Offer offer = hostOffer.getOffer();
return ImmutableMap.<String, Object>builder()
.put("id", offer.getId().getValue())
.put("framework_id", offer.getFrameworkId().getValue())
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 4db9be8..78a9670 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -34,6 +34,8 @@ import org.apache.mesos.Protos.TaskInfo;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+
/**
* Responsible for matching a task against an offer.
*/
@@ -43,13 +45,13 @@ public interface TaskAssigner {
* Tries to match a task against an offer. If a match is found, the assigner should
* make the appropriate changes to the task and provide a non-empty result.
*
- * @param offer The resource offer.
+ * @param hostOffer The resource offer.
* @param task The task to match against and optionally assign.
* @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
* @return Instructions for launching the task if matching and assignment were successful.
*/
Optional<TaskInfo> maybeAssign(
- Offer offer,
+ HostOffer hostOffer,
IScheduledTask task,
AttributeAggregate attributeAggregate);
@@ -87,20 +89,21 @@ public interface TaskAssigner {
@Override
public Optional<TaskInfo> maybeAssign(
- Offer offer,
+ HostOffer hostOffer,
IScheduledTask task,
AttributeAggregate attributeAggregate) {
Set<Veto> vetoes = filter.filter(
- ResourceSlot.from(offer),
- offer.getHostname(),
+ ResourceSlot.from(hostOffer.getOffer()),
+ hostOffer.getOffer().getHostname(),
+ hostOffer.getMode(),
task.getAssignedTask().getTask(),
Tasks.id(task),
attributeAggregate);
if (vetoes.isEmpty()) {
- return Optional.of(assign(offer, task));
+ return Optional.of(assign(hostOffer.getOffer(), task));
} else {
- LOG.fine("Slave " + offer.getHostname() + " vetoed task " + Tasks.id(task)
+ LOG.fine("Slave " + hostOffer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
+ ": " + vetoes);
return Optional.absent();
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index fc0f496..844a38a 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -42,7 +42,6 @@ import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.mesos.Protos.Offer;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
@@ -50,6 +49,8 @@ import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+
/**
* Module to configure export of cluster-wide resource allocation and consumption statistics.
*/
@@ -111,16 +112,16 @@ public class AsyncStatsModule extends AbstractModule {
}
static class OfferAdapter implements MachineResourceProvider {
- private static final Function<Offer, MachineResource> TO_RESOURCE =
- new Function<Offer, MachineResource>() {
+ private static final Function<HostOffer, MachineResource> TO_RESOURCE =
+ new Function<HostOffer, MachineResource>() {
@Override
- public MachineResource apply(Offer offer) {
- Resources resources = Resources.from(offer);
+ public MachineResource apply(HostOffer hostOffer) {
+ Resources resources = Resources.from(hostOffer.getOffer());
IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
.setNumCpus(resources.getNumCpus())
.setRamMb(resources.getRam().as(Data.MB))
.setDiskMb(resources.getDisk().as(Data.MB)));
- return new MachineResource(quota, Conversions.isDedicated(offer));
+ return new MachineResource(quota, Conversions.isDedicated(hostOffer.getOffer()));
}
};
@@ -133,7 +134,7 @@ public class AsyncStatsModule extends AbstractModule {
@Override
public Iterable<MachineResource> get() {
- Iterable<Offer> offers = offerQueue.getOffers();
+ Iterable<HostOffer> offers = offerQueue.getOffers();
return FluentIterable.from(offers).transform(TO_RESOURCE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
index ddd24c3..15fb7ff 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
@@ -42,6 +42,7 @@ import org.easymock.IAnswer;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertFalse;
@@ -59,7 +60,7 @@ public class OfferQueueImplTest extends EasyMockTest {
private ScheduledExecutorService executor;
private ExecutorService testExecutor;
private MaintenanceController maintenanceController;
- private Function<Offer, Optional<TaskInfo>> offerAcceptor;
+ private Function<HostOffer, Optional<TaskInfo>> offerAcceptor;
private OfferQueueImpl offerQueue;
@Before
@@ -76,7 +77,7 @@ public class OfferQueueImplTest extends EasyMockTest {
}
});
maintenanceController = createMock(MaintenanceController.class);
- offerAcceptor = createMock(new Clazz<Function<Offer, Optional<TaskInfo>>>() { });
+ offerAcceptor = createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>() { });
OfferReturnDelay returnDelay = new OfferReturnDelay() {
@Override
public Amount<Integer, Time> get() {
@@ -127,12 +128,20 @@ public class OfferQueueImplTest extends EasyMockTest {
@Test
public void testOffersSorted() throws Exception {
- expect(maintenanceController.getMode(HOST_A)).andReturn(MaintenanceMode.NONE);
- expect(maintenanceController.getMode(HOST_B)).andReturn(MaintenanceMode.DRAINING);
- expect(maintenanceController.getMode(HOST_C)).andReturn(MaintenanceMode.NONE);
- expect(offerAcceptor.apply(OFFER_A)).andReturn(Optional.<TaskInfo>absent());
- expect(offerAcceptor.apply(OFFER_C)).andReturn(Optional.<TaskInfo>absent());
- expect(offerAcceptor.apply(OFFER_B)).andReturn(Optional.<TaskInfo>absent());
+ MaintenanceMode modeA = MaintenanceMode.NONE;
+ MaintenanceMode modeB = MaintenanceMode.DRAINING;
+ MaintenanceMode modeC = MaintenanceMode.NONE;
+
+ HostOffer hostOfferA = new HostOffer(OFFER_A, modeA);
+ HostOffer hostOfferB = new HostOffer(OFFER_B, modeB);
+ HostOffer hostOfferC = new HostOffer(OFFER_C, modeC);
+
+ expect(maintenanceController.getMode(HOST_A)).andReturn(modeA);
+ expect(maintenanceController.getMode(HOST_B)).andReturn(modeB);
+ expect(maintenanceController.getMode(HOST_C)).andReturn(modeC);
+ expect(offerAcceptor.apply(hostOfferA)).andReturn(Optional.<TaskInfo>absent());
+ expect(offerAcceptor.apply(hostOfferB)).andReturn(Optional.<TaskInfo>absent());
+ expect(offerAcceptor.apply(hostOfferC)).andReturn(Optional.<TaskInfo>absent());
control.replay();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index 18c21e3..c0fa462 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -18,6 +18,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Suppliers;
import com.google.common.collect.FluentIterable;
@@ -36,6 +37,7 @@ import org.apache.aurora.gen.Constraint;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
@@ -68,6 +70,7 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import static org.apache.mesos.Protos.Offer;
@@ -124,7 +127,8 @@ public class PreemptorImplTest extends EasyMockTest {
offerQueue,
schedulingFilter,
PREEMPTION_DELAY,
- clock);
+ clock,
+ maintenance);
preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
}
@@ -146,6 +150,10 @@ public class PreemptorImplTest extends EasyMockTest {
IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
}
+ private void expectGetMaintenance(String host) {
+ expect(maintenance.getMode(host)).andReturn(MaintenanceMode.NONE);
+ }
+
@Test
public void testPreempted() throws Exception {
schedulingFilter = createMock(SchedulingFilter.class);
@@ -160,6 +168,8 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(highPriority);
expectGetActiveTasks(lowPriority);
+ expectGetMaintenance(HOST_A);
+
expectFiltering();
expectPreempted(lowPriority);
@@ -184,6 +194,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(highPriority);
expectGetActiveTasks(lowerPriority, lowerPriority);
+ expectGetMaintenance(HOST_A);
expectFiltering();
expectPreempted(lowerPriority);
@@ -211,6 +222,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(pendingPriority);
expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
+ expectGetMaintenance(HOST_A);
expectFiltering();
expectPreempted(lowestPriority);
@@ -251,6 +263,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(p1);
expectGetActiveTasks(a1);
+ expectGetMaintenance(HOST_A);
expectFiltering();
expectPreempted(a1);
@@ -273,6 +286,7 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(p1);
expectGetActiveTasks(a1);
+ expectGetMaintenance(HOST_A);
expectFiltering();
expectPreempted(a1);
@@ -301,7 +315,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures a production task can preempt 2 tasks on the same host.
@Test
public void testProductionPreemptingManyNonProduction() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+ schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
@@ -333,7 +347,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures we select the minimal number of tasks to preempt
@Test
public void testMinimalSetPreempted() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+ schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
@@ -368,7 +382,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures a production task *never* preempts a production task from another job.
@Test
public void testProductionJobNeverPreemptsProductionJob() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+ schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
@@ -393,7 +407,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures that we can preempt if a task + offer can satisfy a pending task.
@Test
public void testPreemptWithOfferAndTask() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+ schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
setUpHost(HOST_A, RACK_A);
@@ -421,7 +435,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures we can preempt if two tasks and an offer can satisfy a pending task.
@Test
public void testPreemptWithOfferAndMultipleTasks() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+ schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
setUpHost(HOST_A, RACK_A);
@@ -454,7 +468,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures we don't preempt if a host has enough slack to satisfy a pending task.
@Test
public void testPreemptWithLargeOffer() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+ schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
setUpHost(HOST_A, RACK_A);
@@ -509,7 +523,8 @@ public class PreemptorImplTest extends EasyMockTest {
offerQueue,
schedulingFilter,
PREEMPTION_DELAY,
- clock);
+ clock,
+ maintenance);
assertEquals(
Optional.<String>absent(),
@@ -537,17 +552,25 @@ public class PreemptorImplTest extends EasyMockTest {
}
private void expectOffers(Offer ... offers) {
- expect(offerQueue.getOffers()).andReturn(Lists.newArrayList(offers));
+ Iterable<HostOffer> hostOffers = FluentIterable.from(Lists.newArrayList(offers))
+ .transform(new Function<Offer, HostOffer>() {
+ @Override
+ public HostOffer apply(Offer offer) {
+ return new HostOffer(offer, MaintenanceMode.NONE);
+ }
+ });
+ expect(offerQueue.getOffers()).andReturn(hostOffers);
}
private void expectNoOffers() {
- expect(offerQueue.getOffers()).andReturn(ImmutableList.<Offer>of());
+ expect(offerQueue.getOffers()).andReturn(ImmutableList.<HostOffer>of());
}
private IExpectationSetters<Set<Veto>> expectFiltering() {
return expect(schedulingFilter.filter(
EasyMock.<ResourceSlot>anyObject(),
EasyMock.eq(HOST_A),
+ EasyMock.eq(MaintenanceMode.NONE),
EasyMock.<ITaskConfig>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.eq(emptyJob))).andAnswer(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index bb0b3b2..5d12df9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -29,6 +29,7 @@ import com.twitter.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
@@ -47,7 +48,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.TaskInfo;
import org.easymock.Capture;
import org.junit.Before;
@@ -55,6 +55,7 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
@@ -65,7 +66,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
private static final IScheduledTask TASK_A = makeTask("a");
private static final IScheduledTask TASK_B = makeTask("b");
- private static final Offer OFFER = Offers.makeOffer("OFFER_A", "HOST_A");
+ private static final HostOffer OFFER =
+ new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
private StorageTestUtil storageUtil;
private StateManager stateManager;
@@ -135,16 +137,16 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectLaunchAttempt(false);
// Reserve "a" with offerA
expect(preemptor.findPreemptionSlotFor("a", emptyJob))
- .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+ .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
expectTaskStillPendingQuery(TASK_B);
expectActiveJobFetch(TASK_B);
- Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(false);
+ Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(false);
expect(preemptor.findPreemptionSlotFor("b", emptyJob)).andReturn(Optional.<String>absent());
expectTaskStillPendingQuery(TASK_B);
expectActiveJobFetch(TASK_B);
- Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
+ Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
expectAssigned(TASK_B);
control.replay();
@@ -170,17 +172,17 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectLaunchAttempt(false);
// Reserve "a" with offerA
expect(preemptor.findPreemptionSlotFor("a", emptyJob))
- .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+ .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+ Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
expectAssigned(TASK_A);
expectTaskStillPendingQuery(TASK_B);
expectActiveJobFetch(TASK_B);
- Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
+ Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
expect(assigner.maybeAssign(OFFER, TASK_B, emptyJob))
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
@@ -203,11 +205,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectLaunchAttempt(false);
// Reserve "a" with offerA
expect(preemptor.findPreemptionSlotFor("a", emptyJob))
- .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+ .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+ Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
expectAssigned(TASK_A);
control.replay();
@@ -228,11 +230,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
// Reserve "a" with offerA
expect(preemptor.findPreemptionSlotFor("a", emptyJob))
- .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+ .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
expectTaskStillPendingQuery(TASK_B);
expectActiveJobFetch(TASK_B);
- Capture<Function<Offer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
+ Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
expectAssigned(TASK_B);
control.replay();
@@ -253,11 +255,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectLaunchAttempt(false);
// Reserve "b" with offer1
expect(preemptor.findPreemptionSlotFor("b", emptyJob))
- .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+ .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+ Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
expectAssigned(TASK_A);
control.replay();
@@ -291,7 +293,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
});
- Capture<Function<Offer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
+ Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
expect(assigner.maybeAssign(OFFER, taskA, emptyJob))
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
@@ -313,9 +315,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
.setEnvironment("env-" + taskId))));
}
- private Capture<Function<Offer, Optional<TaskInfo>>> expectLaunchAttempt(boolean taskLaunched)
+ private Capture<Function<HostOffer, Optional<TaskInfo>>> expectLaunchAttempt(boolean taskLaunched)
throws OfferQueue.LaunchException {
- Capture<Function<Offer, Optional<TaskInfo>>> assignment = createCapture();
+ Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = createCapture();
expect(offerQueue.launchFirst(capture(assignment))).andReturn(taskLaunched);
return assignment;
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index ec60880..fe57375 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -74,6 +74,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
@@ -88,10 +89,14 @@ public class TaskSchedulerTest extends EasyMockTest {
private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
- private static final Offer OFFER_A = Offers.makeOffer("OFFER_A", "HOST_A");
- private static final Offer OFFER_B = Offers.makeOffer("OFFER_B", "HOST_B");
- private static final Offer OFFER_C = Offers.makeOffer("OFFER_C", "HOST_C");
- private static final Offer OFFER_D = Offers.makeOffer("OFFER_D", "HOST_D");
+ private static final HostOffer OFFER_A =
+ new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
+ private static final HostOffer OFFER_B =
+ new HostOffer(Offers.makeOffer("OFFER_B", "HOST_B"), MaintenanceMode.SCHEDULED);
+ private static final HostOffer OFFER_C =
+ new HostOffer(Offers.makeOffer("OFFER_C", "HOST_C"), MaintenanceMode.DRAINING);
+ private static final HostOffer OFFER_D =
+ new HostOffer(Offers.makeOffer("OFFER_D", "HOST_D"), MaintenanceMode.DRAINED);
private Storage storage;
@@ -204,8 +209,8 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
- offerQueue.addOffer(OFFER_B);
+ offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_B.getOffer());
}
@Test
@@ -283,7 +288,7 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
- driver.launchTask(OFFER_A.getId(), mesosTask);
+ driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
@@ -291,7 +296,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_A.getOffer());
changeState(task, INIT, PENDING);
timeoutCapture.getValue().run();
timeoutCapture2.getValue().run();
@@ -314,7 +319,7 @@ public class TaskSchedulerTest extends EasyMockTest {
expectAnyMaintenanceCalls();
expectOfferDeclineIn(10);
expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
- driver.launchTask(OFFER_A.getId(), mesosTask);
+ driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
expect(stateManager.changeState(
"a",
@@ -326,7 +331,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
changeState(task, INIT, PENDING);
- offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_A.getOffer());
timeoutCapture.getValue().run();
}
@@ -347,13 +352,13 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
- driver.launchTask(OFFER_A.getId(), mesosTask);
+ driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
expectLastCall();
replayAndCreateScheduler();
changeState(task, INIT, PENDING);
- offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_A.getOffer());
timeoutCapture.getValue().run();
timeoutCapture2.getValue().run();
}
@@ -368,14 +373,14 @@ public class TaskSchedulerTest extends EasyMockTest {
expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.<TaskInfo>absent());
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
- driver.declineOffer(OFFER_A.getId());
+ driver.declineOffer(OFFER_A.getOffer().getId());
expectTaskGroupBackoff(10, 20);
expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
replayAndCreateScheduler();
changeState(task, INIT, PENDING);
- offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_A.getOffer());
timeoutCapture.getValue().run();
offerExpirationCapture.getValue().run();
timeoutCapture2.getValue().run();
@@ -387,14 +392,14 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
Offer offerAB =
- Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getSlaveId()).build();
+ Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build();
- driver.declineOffer(OFFER_A.getId());
+ driver.declineOffer(OFFER_A.getOffer().getId());
driver.declineOffer(offerAB.getId());
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_A.getOffer());
offerQueue.addOffer(offerAB);
offerExpirationCapture.getValue().run();
}
@@ -404,15 +409,15 @@ public class TaskSchedulerTest extends EasyMockTest {
expectAnyMaintenanceCalls();
Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
- Function<Offer, Optional<TaskInfo>> offerAcceptor =
- createMock(new Clazz<Function<Offer, Optional<TaskInfo>>>() { });
+ Function<HostOffer, Optional<TaskInfo>> offerAcceptor =
+ createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>() { });
final TaskInfo taskInfo = TaskInfo.getDefaultInstance();
expect(offerAcceptor.apply(OFFER_A)).andReturn(Optional.of(taskInfo));
- driver.launchTask(OFFER_A.getId(), taskInfo);
+ driver.launchTask(OFFER_A.getOffer().getId(), taskInfo);
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_A.getOffer());
offerQueue.launchFirst(offerAcceptor);
offerExpirationCapture.getValue().run();
}
@@ -420,32 +425,32 @@ public class TaskSchedulerTest extends EasyMockTest {
@Test
public void testBasicMaintenancePreferences() {
expectOffer();
- expect(maintenance.getMode("HOST_D")).andReturn(MaintenanceMode.DRAINED);
+ expect(maintenance.getMode("HOST_D")).andReturn(OFFER_D.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_C")).andReturn(MaintenanceMode.DRAINING);
+ expect(maintenance.getMode("HOST_C")).andReturn(OFFER_C.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_B")).andReturn(MaintenanceMode.SCHEDULED);
+ expect(maintenance.getMode("HOST_B")).andReturn(OFFER_B.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_A")).andReturn(MaintenanceMode.NONE);
+ expect(maintenance.getMode("HOST_A")).andReturn(OFFER_A.getMode());
IScheduledTask taskA = makeTask("A", PENDING);
TaskInfo mesosTaskA = makeTaskInfo(taskA);
expect(assigner.maybeAssign(OFFER_A, taskA, emptyJob)).andReturn(Optional.of(mesosTaskA));
- driver.launchTask(OFFER_A.getId(), mesosTaskA);
+ driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
IScheduledTask taskB = makeTask("B", PENDING);
TaskInfo mesosTaskB = makeTaskInfo(taskB);
expect(assigner.maybeAssign(OFFER_B, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
- driver.launchTask(OFFER_B.getId(), mesosTaskB);
+ driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_D);
- offerQueue.addOffer(OFFER_C);
- offerQueue.addOffer(OFFER_B);
- offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_D.getOffer());
+ offerQueue.addOffer(OFFER_C.getOffer());
+ offerQueue.addOffer(OFFER_B.getOffer());
+ offerQueue.addOffer(OFFER_A.getOffer());
changeState(taskA, INIT, PENDING);
captureA.getValue().run();
@@ -457,29 +462,31 @@ public class TaskSchedulerTest extends EasyMockTest {
@Test
public void testChangingMaintenancePreferences() {
expectOffer();
- expect(maintenance.getMode("HOST_A")).andReturn(MaintenanceMode.NONE);
+ expect(maintenance.getMode("HOST_A")).andReturn(OFFER_A.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_B")).andReturn(MaintenanceMode.SCHEDULED);
+ expect(maintenance.getMode("HOST_B")).andReturn(OFFER_B.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_C")).andReturn(MaintenanceMode.DRAINED);
+ expect(maintenance.getMode("HOST_C")).andReturn(OFFER_C.getMode());
IScheduledTask taskA = makeTask("A", PENDING);
TaskInfo mesosTaskA = makeTaskInfo(taskA);
expect(assigner.maybeAssign(OFFER_B, taskA, emptyJob)).andReturn(Optional.of(mesosTaskA));
- driver.launchTask(OFFER_B.getId(), mesosTaskA);
+ driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
IScheduledTask taskB = makeTask("B", PENDING);
TaskInfo mesosTaskB = makeTaskInfo(taskB);
- expect(assigner.maybeAssign(OFFER_C, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
- driver.launchTask(OFFER_C.getId(), mesosTaskB);
+ HostOffer updatedOfferC =
+ new HostOffer(OFFER_C.getOffer(), MaintenanceMode.NONE);
+ expect(assigner.maybeAssign(updatedOfferC, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
+ driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
- offerQueue.addOffer(OFFER_B);
- offerQueue.addOffer(OFFER_C);
+ offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_B.getOffer());
+ offerQueue.addOffer(OFFER_C.getOffer());
// Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable
@@ -498,7 +505,7 @@ public class TaskSchedulerTest extends EasyMockTest {
TaskInfo mesosTask = makeTaskInfo(task);
Capture<IScheduledTask> taskScheduled = createCapture();
expect(assigner.maybeAssign(
- EasyMock.<Offer>anyObject(),
+ EasyMock.<HostOffer>anyObject(),
capture(taskScheduled),
EasyMock.eq(emptyJob)))
.andReturn(Optional.of(mesosTask));
@@ -543,10 +550,10 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
- offerQueue.addOffer(OFFER_B);
- offerQueue.addOffer(OFFER_C);
- offerQueue.addOffer(OFFER_D);
+ offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_B.getOffer());
+ offerQueue.addOffer(OFFER_C.getOffer());
+ offerQueue.addOffer(OFFER_D.getOffer());
changeState(jobA0, INIT, PENDING);
changeState(jobA1, INIT, PENDING);
changeState(jobA2, INIT, PENDING);
@@ -572,7 +579,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_A.getOffer());
changeState(task, INIT, PENDING);
timeoutCapture.getValue().run();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 4065629..0318179 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -19,6 +19,7 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
@@ -43,6 +44,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
private static final ResourceSlot TASK_RESOURCES = ResourceSlot.from(TASK);
private static final String TASK_ID = "taskId";
private static final String SLAVE = "slaveHost";
+ private static final MaintenanceMode MODE = MaintenanceMode.NONE;
private static final Veto VETO_1 = new Veto("veto1", 1);
private static final Veto VETO_2 = new Veto("veto2", 2);
@@ -65,21 +67,21 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
@Test
public void testEvents() {
Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
- expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+ expect(delegate.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
eventSink.post(new Vetoed(TASK_ID, vetoes));
control.replay();
- assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob));
+ assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob));
}
@Test
public void testNoVetoes() {
Set<Veto> vetoes = ImmutableSet.of();
- expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+ expect(delegate.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
control.replay();
- assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob));
+ assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 6a9c4ee..bffbf83 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -40,7 +40,6 @@ import org.apache.aurora.gen.ValueConstraint;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -99,13 +98,13 @@ public class SchedulingFilterImplTest extends EasyMockTest {
Amount.of(DEFAULT_RAM, Data.MB),
Amount.of(DEFAULT_DISK, Data.MB),
0);
+ private static final MaintenanceMode DEFAULT_MODE = MaintenanceMode.NONE;
private AttributeAggregate emptyJob;
private final AtomicLong taskIdCounter = new AtomicLong();
private SchedulingFilter defaultFilter;
- private MaintenanceController maintenance;
private Storage storage;
private StoreProvider storeProvider;
private AttributeStore.Mutable attributeStore;
@@ -113,8 +112,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
@Before
public void setUp() throws Exception {
storage = createMock(Storage.class);
- maintenance = createMock(MaintenanceController.class);
- defaultFilter = new SchedulingFilterImpl(storage, maintenance);
+ defaultFilter = new SchedulingFilterImpl(storage);
storeProvider = createMock(StoreProvider.class);
attributeStore = createMock(AttributeStore.Mutable.class);
emptyJob = new AttributeAggregate(
@@ -141,7 +139,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
@Test
public void testMeetsOffer() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A).times(2);
control.replay();
@@ -152,7 +149,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
@Test
public void testSufficientPorts() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A).times(4);
control.replay();
@@ -176,33 +172,35 @@ public class SchedulingFilterImplTest extends EasyMockTest {
.setRequestedPorts(ImmutableSet.of("one", "two", "three")));
Set<Veto> none = ImmutableSet.of();
- assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID, emptyJob));
- assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID, emptyJob));
- assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID, emptyJob));
+ assertEquals(none,
+ defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, noPortTask, TASK_ID, emptyJob));
+ assertEquals(none,
+ defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, onePortTask, TASK_ID, emptyJob));
+ assertEquals(none,
+ defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, twoPortTask, TASK_ID, emptyJob));
assertEquals(
ImmutableSet.of(PORTS.veto(1)),
- defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID, emptyJob));
+ defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, threePortTask, TASK_ID, emptyJob));
}
@Test
public void testInsufficientResources() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A).times(4);
control.replay();
assertVetoes(
makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM + 1, DEFAULT_DISK + 1),
+ DEFAULT_MODE,
CPU.veto(1), DISK.veto(1), RAM.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), CPU.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), RAM.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), DISK.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), DEFAULT_MODE, CPU.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), DEFAULT_MODE, RAM.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), DEFAULT_MODE, DISK.veto(1));
}
@Test
public void testDedicatedRole() throws Exception {
expectGetHostAttributes(HOST_A, dedicated(ROLE_A)).anyTimes();
- expectGetHostMaintenanceStatus(HOST_A).times(2);
control.replay();
@@ -216,7 +214,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
String dedicated2 = "kestrel/kestrel";
expectGetHostAttributes(HOST_A, dedicated(dedicated1, dedicated2)).anyTimes();
- expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
control.replay();
@@ -240,8 +237,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
public void testMultiValuedAttributes() throws Exception {
expectGetHostAttributes(HOST_A, valueAttribute("jvm", "1.0", "2.0", "3.0")).anyTimes();
expectGetHostAttributes(HOST_B, valueAttribute("jvm", "1.0")).anyTimes();
- expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_B).atLeastOnce();
control.replay();
@@ -255,39 +250,36 @@ public class SchedulingFilterImplTest extends EasyMockTest {
@Test
public void testHostScheduledForMaintenance() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.SCHEDULED);
control.replay();
- assertNoVetoes(makeTask(), HOST_A);
+ assertNoVetoes(makeTask(), HOST_A, MaintenanceMode.SCHEDULED);
}
@Test
public void testHostDrainingForMaintenance() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINING);
control.replay();
- assertVetoes(makeTask(), ConstraintFilter.maintenanceVeto("draining"));
+ assertVetoes(makeTask(),
+ MaintenanceMode.DRAINING,
+ ConstraintFilter.maintenanceVeto("draining"));
}
@Test
public void testHostDrainedForMaintenance() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINED);
control.replay();
- assertVetoes(makeTask(), ConstraintFilter.maintenanceVeto("drained"));
+ assertVetoes(makeTask(), MaintenanceMode.DRAINED, ConstraintFilter.maintenanceVeto("drained"));
}
@Test
public void testMultipleTaskConstraints() throws Exception {
expectGetHostAttributes(HOST_A, dedicated(HOST_A), host(HOST_A));
expectGetHostAttributes(HOST_B, dedicated("xxx"), host(HOST_A));
- expectGetHostMaintenanceStatus(HOST_A);
- expectGetHostMaintenanceStatus(HOST_B);
control.replay();
@@ -298,7 +290,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
makeTask(OWNER_A, JOB_A, constraint1, constraint2),
HOST_A,
mismatchVeto(DEDICATED_ATTRIBUTE));
- assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B);
+ assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B, DEFAULT_MODE);
}
@Test
@@ -308,8 +300,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
expectGetHostAttributes(HOST_A, host(HOST_A));
expectGetHostAttributes(HOST_B, dedicated(OWNER_B.getRole() + "/" + JOB_B), host(HOST_B));
- expectGetHostMaintenanceStatus(HOST_A);
- expectGetHostMaintenanceStatus(HOST_B);
control.replay();
@@ -328,11 +318,10 @@ public class SchedulingFilterImplTest extends EasyMockTest {
public void testUnderLimitNoTasks() throws Exception {
expectGetHostAttributes(HOST_A);
expectGetHostAttributes(HOST_A, host(HOST_A));
- expectGetHostMaintenanceStatus(HOST_A);
control.replay();
- assertNoVetoes(hostLimitTask(2), HOST_A);
+ assertNoVetoes(hostLimitTask(2), HOST_A, DEFAULT_MODE);
}
private Attribute host(String host) {
@@ -352,9 +341,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
expectGetHostAttributes(HOST_B, host(HOST_B), rack(RACK_A)).atLeastOnce();
expectGetHostAttributes(HOST_C, host(HOST_C), rack(RACK_B)).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_B).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_C).atLeastOnce();
AttributeAggregate stateA = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of(
makeScheduledTask(OWNER_A, JOB_A, HOST_A),
@@ -371,24 +357,48 @@ public class SchedulingFilterImplTest extends EasyMockTest {
control.replay();
assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A, stateA);
- assertVetoes(hostLimitTask(OWNER_A, JOB_A, 1), HOST_B, stateA, limitVeto(HOST_ATTRIBUTE));
- assertVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_B, stateA, limitVeto(HOST_ATTRIBUTE));
+ assertVetoes(
+ hostLimitTask(OWNER_A, JOB_A, 1),
+ HOST_B,
+ stateA,
+ DEFAULT_MODE,
+ limitVeto(HOST_ATTRIBUTE));
+ assertVetoes(
+ hostLimitTask(OWNER_A, JOB_A, 2),
+ HOST_B,
+ stateA,
+ DEFAULT_MODE,
+ limitVeto(HOST_ATTRIBUTE));
assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B, stateA);
- assertVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_B, stateB, limitVeto(RACK_ATTRIBUTE));
- assertVetoes(rackLimitTask(OWNER_B, JOB_A, 3), HOST_B, stateB, limitVeto(RACK_ATTRIBUTE));
+ assertVetoes(
+ rackLimitTask(OWNER_B, JOB_A, 2),
+ HOST_B,
+ stateB,
+ DEFAULT_MODE,
+ limitVeto(RACK_ATTRIBUTE));
+ assertVetoes(
+ rackLimitTask(OWNER_B, JOB_A, 3),
+ HOST_B,
+ stateB,
+ DEFAULT_MODE,
+ limitVeto(RACK_ATTRIBUTE));
assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B, stateB);
assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C, stateB);
- assertVetoes(rackLimitTask(OWNER_A, JOB_A, 1), HOST_C, stateA, limitVeto(RACK_ATTRIBUTE));
+ assertVetoes(
+ rackLimitTask(OWNER_A, JOB_A, 1),
+ HOST_C,
+ stateA,
+ DEFAULT_MODE,
+ limitVeto(RACK_ATTRIBUTE));
assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C, stateB);
}
@Test
public void testAttribute() throws Exception {
expectGetHostAttributes(HOST_A, valueAttribute("jvm", "1.0")).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
control.replay();
@@ -413,7 +423,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
expectGetHostAttributes(HOST_A,
valueAttribute("jvm", "1.4", "1.6", "1.7"),
valueAttribute("zone", "a", "b", "c")).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
control.replay();
@@ -437,7 +446,13 @@ public class SchedulingFilterImplTest extends EasyMockTest {
Constraint zoneConstraint = makeConstraint("zone", "c");
ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
- assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID, emptyJob).isEmpty());
+ assertTrue(
+ defaultFilter.filter(
+ DEFAULT_OFFER,
+ HOST_A,
+ DEFAULT_MODE,
+ task, TASK_ID,
+ emptyJob).isEmpty());
Constraint jvmNegated = jvmConstraint.deepCopy();
jvmNegated.getConstraint().getValue().setNegated(true);
@@ -464,7 +479,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
expectGetHostAttributes(HOST_A,
valueAttribute("jvm", "1.4"),
valueAttribute("jvm", "1.6", "1.7")).atLeastOnce();
- expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
control.replay();
@@ -519,14 +533,26 @@ public class SchedulingFilterImplTest extends EasyMockTest {
ITaskConfig task = makeTask(owner, jobName, constraint);
assertEquals(
expected,
- defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, aggregate).isEmpty());
+ defaultFilter.filter(
+ DEFAULT_OFFER,
+ host,
+ DEFAULT_MODE,
+ task,
+ TASK_ID,
+ aggregate).isEmpty());
Constraint negated = constraint.deepCopy();
negated.getConstraint().getValue().setNegated(!value.isNegated());
ITaskConfig negatedTask = makeTask(owner, jobName, negated);
assertEquals(
!expected,
- defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID, aggregate).isEmpty());
+ defaultFilter.filter(
+ DEFAULT_OFFER,
+ host,
+ DEFAULT_MODE,
+ negatedTask,
+ TASK_ID,
+ aggregate).isEmpty());
return task;
}
@@ -538,20 +564,27 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertNoVetoes(task, HOST_A, jobState);
}
- private void assertNoVetoes(ITaskConfig task, String host) {
- assertVetoes(task, host, emptyJob);
+ private void assertNoVetoes(ITaskConfig task, String host, MaintenanceMode mode) {
+ assertVetoes(task, host, emptyJob, mode);
}
- private void assertNoVetoes(ITaskConfig task, String host, AttributeAggregate jobState) {
- assertVetoes(task, host, jobState);
+ private void assertNoVetoes(
+ ITaskConfig task,
+ String host,
+ AttributeAggregate jobState) {
+ assertVetoes(task, host, jobState, DEFAULT_MODE);
}
- private void assertVetoes(ITaskConfig task, Veto... vetos) {
- assertVetoes(task, emptyJob, vetos);
+ private void assertVetoes(ITaskConfig task, MaintenanceMode mode, Veto... vetos) {
+ assertVetoes(task, emptyJob, mode, vetos);
}
- private void assertVetoes(ITaskConfig task, AttributeAggregate jobState, Veto... vetos) {
- assertVetoes(task, HOST_A, jobState, vetos);
+ private void assertVetoes(
+ ITaskConfig task,
+ AttributeAggregate jobState,
+ MaintenanceMode mode,
+ Veto... vetos) {
+ assertVetoes(task, HOST_A, jobState, mode, vetos);
}
private void assertVetoes(
@@ -559,18 +592,19 @@ public class SchedulingFilterImplTest extends EasyMockTest {
String host,
Veto... vetoes) {
- assertVetoes(task, host, emptyJob, vetoes);
+ assertVetoes(task, host, emptyJob, DEFAULT_MODE, vetoes);
}
private void assertVetoes(
ITaskConfig task,
String host,
AttributeAggregate jobState,
+ MaintenanceMode mode,
Veto... vetoes) {
assertEquals(
ImmutableSet.copyOf(vetoes),
- defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, jobState));
+ defaultFilter.filter(DEFAULT_OFFER, host, mode, task, TASK_ID, jobState));
}
private Attribute valueAttribute(String name, String string, String... strings) {
@@ -583,15 +617,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
TaskConstraint.value(new ValueConstraint(false, ImmutableSet.copyOf(values))));
}
- private IExpectationSetters<MaintenanceMode> expectGetHostMaintenanceStatus(String host) {
- return expectGetHostMaintenanceStatus(host, MaintenanceMode.NONE);
- }
-
- private IExpectationSetters<MaintenanceMode> expectGetHostMaintenanceStatus(
- String host, MaintenanceMode mode) {
- return expect(maintenance.getMode(host)).andReturn(mode);
- }
-
private IExpectationSetters<Optional<IHostAttributes>> expectGetHostAttributes(
String host,
Attribute... attributes) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index c48cbae..c7da5ab 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -20,6 +20,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.MesosTaskFactory;
@@ -44,6 +45,7 @@ import org.apache.mesos.Protos.Value.Type;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
@@ -61,6 +63,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
.setRanges(
Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
.build();
+ private static final HostOffer HOST_OFFER = new HostOffer(OFFER, MaintenanceMode.NONE);
private static final IScheduledTask TASK = IScheduledTask.build(
new ScheduledTask()
.setAssignedTask(new AssignedTask()
@@ -96,6 +99,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
expect(filter.filter(
ResourceSlot.from(OFFER),
OFFER.getHostname(),
+ MaintenanceMode.NONE,
TASK.getAssignedTask().getTask(),
Tasks.id(TASK),
emptyJob))
@@ -110,7 +114,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK, emptyJob));
+ assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(HOST_OFFER, TASK, emptyJob));
}
@Test
@@ -118,6 +122,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
expect(filter.filter(
ResourceSlot.from(OFFER),
OFFER.getHostname(),
+ MaintenanceMode.NONE,
TASK.getAssignedTask().getTask(),
Tasks.id(TASK),
emptyJob))
@@ -125,6 +130,6 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK, emptyJob));
+ assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(HOST_OFFER, TASK, emptyJob));
}
}