You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/20 00:53:25 UTC
incubator-aurora git commit: Simplify Preemptor code,
encapsulate parameters used there and in SchedulingFilter.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 4e52d00b8 -> 5cdc3900c
Simplify Preemptor code, encapsulate parameters used there and in SchedulingFilter.
Reviewed at https://reviews.apache.org/r/28103/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/5cdc3900
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/5cdc3900
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/5cdc3900
Branch: refs/heads/master
Commit: 5cdc3900cef5cb99435e1cd8056bebf118d9043c
Parents: 4e52d00
Author: Bill Farner <wf...@apache.org>
Authored: Wed Nov 19 15:49:36 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Nov 19 15:49:36 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/async/Preemptor.java | 117 ++++++--------
.../aurora/scheduler/async/TaskScheduler.java | 24 +--
.../org/apache/aurora/scheduler/base/Tasks.java | 9 --
.../configuration/ConfigurationManager.java | 6 +-
.../events/NotifyingSchedulingFilter.java | 16 +-
.../scheduler/filter/ConstraintFilter.java | 139 -----------------
.../scheduler/filter/ConstraintMatcher.java | 131 ++++++++++++++++
.../scheduler/filter/SchedulingFilter.java | 103 +++++++++++--
.../scheduler/filter/SchedulingFilterImpl.java | 151 ++++++-------------
.../aurora/scheduler/state/TaskAssigner.java | 45 +++---
.../aurora/scheduler/stats/ResourceCounter.java | 4 +-
.../scheduler/async/PreemptorImplTest.java | 25 ++-
.../scheduler/async/TaskSchedulerImplTest.java | 14 +-
.../scheduler/async/TaskSchedulerTest.java | 23 +--
.../events/NotifyingSchedulingFilterTest.java | 21 +--
.../filter/SchedulingFilterImplTest.java | 63 +++++---
.../scheduler/state/TaskAssignerImplTest.java | 26 ++--
17 files changed, 460 insertions(+), 457 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index ff26c49..97d5d13 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -26,6 +26,7 @@ import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Functions;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -50,11 +51,14 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
@@ -106,14 +110,6 @@ public interface Preemptor {
static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
EnumSet.copyOf(Sets.difference(Tasks.SLAVE_ASSIGNED_STATES, EnumSet.of(PREEMPTING))));
- private static final Function<IAssignedTask, Integer> GET_PRIORITY =
- new Function<IAssignedTask, Integer>() {
- @Override
- public Integer apply(IAssignedTask task) {
- return task.getTask().getPriority();
- }
- };
-
private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
// Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
@@ -184,20 +180,11 @@ public interface Preemptor {
}
};
- private static Predicate<IAssignedTask> canPreempt(final IAssignedTask pending) {
- return new Predicate<IAssignedTask>() {
- @Override
- public boolean apply(IAssignedTask possibleVictim) {
- return preemptionFilter(possibleVictim).apply(pending);
- }
- };
- }
-
private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
new Function<IAssignedTask, ResourceSlot>() {
@Override
- public ResourceSlot apply(IAssignedTask input) {
- return ResourceSlot.from(input.getTask());
+ public ResourceSlot apply(IAssignedTask task) {
+ return ResourceSlot.from(task.getTask());
}
};
@@ -239,7 +226,7 @@ public interface Preemptor {
Iterable<IAssignedTask> possibleVictims,
Iterable<HostOffer> offers,
IAssignedTask pendingTask,
- AttributeAggregate attributeAggregate) {
+ AttributeAggregate jobState) {
// This enforces the precondition that all of the resources are from the same host. We need to
// get the host for the schedulingFilter.
@@ -263,19 +250,18 @@ public interface Preemptor {
FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
- slackResources,
- attributes,
- pendingTask.getTask(),
- pendingTask.getTaskId(),
- attributeAggregate);
+ new UnusedResource(slackResources, attributes),
+ new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
if (vetoes.isEmpty()) {
return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
}
}
- FluentIterable<IAssignedTask> preemptableTasks =
- FluentIterable.from(possibleVictims).filter(canPreempt(pendingTask));
+ FluentIterable<IAssignedTask> preemptableTasks = FluentIterable.from(possibleVictims)
+ .filter(Predicates.compose(
+ preemptionFilter(pendingTask.getTask()),
+ Tasks.ASSIGNED_TO_INFO));
if (preemptableTasks.isEmpty()) {
return Optional.absent();
@@ -299,11 +285,8 @@ public interface Preemptor {
}
Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
- totalResource,
- attributes.get(),
- pendingTask.getTask(),
- pendingTask.getTaskId(),
- attributeAggregate);
+ new UnusedResource(totalResource, attributes.get()),
+ new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
if (vetoes.isEmpty()) {
return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
@@ -329,12 +312,23 @@ public interface Preemptor {
}
};
+ /**
+ * Order by production flag (true, then false), subsorting by task ID.
+ * TODO(wfarner): Re-evaluate - what do we gain from sorting by task ID?
+ */
+ private static final Ordering<IAssignedTask> SCHEDULING_ORDER =
+ Ordering.explicit(true, false)
+ .onResultOf(Functions.compose(
+ Functions.forPredicate(Tasks.IS_PRODUCTION),
+ Tasks.ASSIGNED_TO_INFO))
+ .compound(Ordering.natural().onResultOf(Tasks.ASSIGNED_TO_ID));
+
private Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
// Only non-pending active tasks may be preempted.
List<IAssignedTask> activeTasks = fetch(CANDIDATE_QUERY);
// Walk through the preemption candidates in reverse scheduling order.
- Collections.sort(activeTasks, Tasks.SCHEDULING_ORDER.reverse());
+ Collections.sort(activeTasks, SCHEDULING_ORDER.reverse());
// Group the tasks by slave id so they can be paired with offers from the same slave.
return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
@@ -402,9 +396,6 @@ public interface Preemptor {
return Optional.absent();
}
- private static final Predicate<IAssignedTask> IS_PRODUCTION =
- Predicates.compose(Tasks.IS_PRODUCTION, Tasks.ASSIGNED_TO_INFO);
-
/**
* Creates a static filter that will identify tasks that may preempt the provided task.
* A task may preempt another task if the following conditions hold true:
@@ -412,48 +403,32 @@ public interface Preemptor {
* - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
* OR {@code preemptableTask} is non-production and the compared task is production.
*
- * @param preemptableTask Task to possibly preempt.
+ * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
* @return A filter that will compare the priorities and resources required by other tasks
* with {@code preemptableTask}.
*/
- private static Predicate<IAssignedTask> preemptionFilter(IAssignedTask preemptableTask) {
- Predicate<IAssignedTask> preemptableIsProduction = preemptableTask.getTask().isProduction()
- ? Predicates.<IAssignedTask>alwaysTrue()
- : Predicates.<IAssignedTask>alwaysFalse();
-
- Predicate<IAssignedTask> priorityFilter =
- greaterPriorityFilter(GET_PRIORITY.apply(preemptableTask));
- return Predicates.or(
- Predicates.and(Predicates.not(preemptableIsProduction), IS_PRODUCTION),
- Predicates.and(isOwnedBy(getRole(preemptableTask)), priorityFilter)
- );
- }
-
- private static Predicate<IAssignedTask> isOwnedBy(final String role) {
- return new Predicate<IAssignedTask>() {
- @Override
- public boolean apply(IAssignedTask task) {
- return getRole(task).equals(role);
- }
- };
- }
-
- private static String getRole(IAssignedTask task) {
- return task.getTask().getJob().getRole();
- }
-
- private static Predicate<Integer> greaterThan(final int value) {
- return new Predicate<Integer>() {
+ private static Predicate<ITaskConfig> preemptionFilter(final ITaskConfig pendingTask) {
+ return new Predicate<ITaskConfig>() {
@Override
- public boolean apply(Integer input) {
- return input > value;
+ public boolean apply(ITaskConfig possibleVictim) {
+ boolean pendingIsProduction = pendingTask.isProduction();
+ boolean victimIsProduction = possibleVictim.isProduction();
+
+ if (pendingIsProduction && !victimIsProduction) {
+ return true;
+ } else if (pendingIsProduction == victimIsProduction) {
+ // If production flags are equal, preemption is based on priority within the same role.
+ if (pendingTask.getJob().getRole().equals(possibleVictim.getJob().getRole())) {
+ return pendingTask.getPriority() > possibleVictim.getPriority();
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
}
};
}
-
- private static Predicate<IAssignedTask> greaterPriorityFilter(int priority) {
- return Predicates.compose(greaterThan(priority), GET_PRIORITY);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 6bfa3ac..f247ccf 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -47,6 +47,7 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.storage.Storage;
@@ -55,6 +56,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskInfo;
@@ -129,9 +131,7 @@ public interface TaskScheduler extends EventSubscriber {
private Function<HostOffer, Optional<TaskInfo>> getAssignerFunction(
final MutableStoreProvider storeProvider,
- final AttributeAggregate attributeAggregate,
- final String taskId,
- final IScheduledTask task) {
+ final ResourceRequest resourceRequest) {
// TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
// and perform the assignment at the very end. This will allow us to use optimistic locking
@@ -142,16 +142,16 @@ public interface TaskScheduler extends EventSubscriber {
Optional<String> reservedTaskId =
reservations.getSlaveReservation(offer.getOffer().getSlaveId());
if (reservedTaskId.isPresent()) {
- if (taskId.equals(reservedTaskId.get())) {
+ if (resourceRequest.getTaskId().equals(reservedTaskId.get())) {
// Slave is reserved to satisfy this task.
- return assigner.maybeAssign(storeProvider, offer, task, attributeAggregate);
+ return assigner.maybeAssign(storeProvider, offer, resourceRequest);
} else {
// Slave is reserved for another task.
return Optional.absent();
}
} else {
// Slave is not reserved.
- return assigner.maybeAssign(storeProvider, offer, task, attributeAggregate);
+ return assigner.maybeAssign(storeProvider, offer, resourceRequest);
}
}
};
@@ -189,16 +189,18 @@ public interface TaskScheduler extends EventSubscriber {
@Override
public Boolean apply(MutableStoreProvider store) {
LOG.fine("Attempting to schedule task " + taskId);
- final IScheduledTask task = Iterables.getOnlyElement(
- store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+ final ITaskConfig task = Iterables.getOnlyElement(
+ Iterables.transform(
+ store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+ Tasks.SCHEDULED_TO_INFO),
null);
if (task == null) {
LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
} else {
- AttributeAggregate aggregate =
- getJobState(store, Tasks.SCHEDULED_TO_JOB_KEY.apply(task));
+ AttributeAggregate aggregate = getJobState(store, task.getJob());
try {
- if (!offerQueue.launchFirst(getAssignerFunction(store, aggregate, taskId, task))) {
+ ResourceRequest resourceRequest = new ResourceRequest(task, taskId, aggregate);
+ if (!offerQueue.launchFirst(getAssignerFunction(store, resourceRequest))) {
// Task could not be scheduled.
maybePreemptFor(taskId, aggregate);
attemptsNoMatch.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index a2997f5..52d37e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -160,15 +160,6 @@ public final class Tasks {
}
};
- /**
- * Order by production flag (true, then false), subsorting by task ID.
- * TODO(Suman Karumuri): Move this call into SchedulerThriftInterface once SchedulerzRole is gone.
- */
- public static final Ordering<IAssignedTask> SCHEDULING_ORDER =
- Ordering.explicit(true, false)
- .onResultOf(Functions.compose(Functions.forPredicate(IS_PRODUCTION), ASSIGNED_TO_INFO))
- .compound(Ordering.natural().onResultOf(ASSIGNED_TO_ID));
-
private Tasks() {
// Utility class.
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 11a80f6..01b0350 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -140,7 +140,7 @@ public final class ConfigurationManager {
new Closure<TaskConfig>() {
@Override
public void execute(TaskConfig task) {
- if (!isDedicated(ITaskConfig.build(task))
+ if (!isDedicated(IConstraint.setFromBuilders(task.getConstraints()))
&& task.isProduction()
&& task.isIsService()
&& !Iterables.any(task.getConstraints(), hasName(RACK_CONSTRAINT))) {
@@ -202,8 +202,8 @@ public final class ConfigurationManager {
return taskConstraint.getSetField() == TaskConstraint._Fields.VALUE;
}
- public static boolean isDedicated(ITaskConfig task) {
- return Iterables.any(task.getConstraints(), getConstraintByName(DEDICATED_ATTRIBUTE));
+ public static boolean isDedicated(Iterable<IConstraint> taskConstraints) {
+ return Iterables.any(taskConstraints, getConstraintByName(DEDICATED_ATTRIBUTE));
}
@Nullable
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index ca53303..edaf2f4 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -20,12 +20,8 @@ import java.util.Set;
import javax.inject.Inject;
import javax.inject.Qualifier;
-import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
@@ -58,16 +54,10 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
}
@Override
- public Set<Veto> filter(
- ResourceSlot offer,
- IHostAttributes hostAttributes,
- ITaskConfig task,
- String taskId,
- AttributeAggregate jobState) {
-
- Set<Veto> vetoes = delegate.filter(offer, hostAttributes, task, taskId, jobState);
+ public Set<Veto> filter(UnusedResource resource, ResourceRequest request) {
+ Set<Veto> vetoes = delegate.filter(resource, request);
if (!vetoes.isEmpty()) {
- eventSink.post(new Vetoed(taskId, vetoes));
+ eventSink.post(new Vetoed(request.getTaskId(), vetoes));
}
return vetoes;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
deleted file mode 100644
index 3839083..0000000
--- a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.filter;
-
-import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.storage.entities.IAttribute;
-import org.apache.aurora.scheduler.storage.entities.IConstraint;
-import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Filter that determines whether a task's constraints are satisfied.
- */
-class ConstraintFilter {
- private final AttributeAggregate cachedjobState;
- private final Iterable<IAttribute> hostAttributes;
-
- /**
- * Creates a new constraint filer for a given job.
- *
- * @param cachedjobState Cached information about the job containing the task being matched.
- * @param hostAttributes The attributes of the host to test against.
- */
- ConstraintFilter(AttributeAggregate cachedjobState, Iterable<IAttribute> hostAttributes) {
- this.cachedjobState = requireNonNull(cachedjobState);
- this.hostAttributes = requireNonNull(hostAttributes);
- }
-
- @VisibleForTesting
- static Veto limitVeto(String limit) {
- return new Veto("Limit not satisfied: " + limit, Veto.MAX_SCORE);
- }
-
- @VisibleForTesting
- static Veto mismatchVeto(String constraint) {
- return Veto.constraintMismatch("Constraint not satisfied: " + constraint);
- }
-
- @VisibleForTesting
- static Veto maintenanceVeto(String reason) {
- return new Veto("Host " + reason + " for maintenance", Veto.MAX_SCORE);
- }
-
- private static final Function<IAttribute, Set<String>> GET_VALUES =
- new Function<IAttribute, Set<String>>() {
- @Override
- public Set<String> apply(IAttribute attribute) {
- return attribute.getValues();
- }
- };
-
- /**
- * Gets the veto (if any) for a scheduling constraint based on the {@link AttributeAggregate} this
- * filter was created with.
- *
- * @param constraint Scheduling filter to check.
- * @return A veto if the constraint is not satisfied based on the existing state of the job.
- */
- Optional<Veto> getVeto(IConstraint constraint) {
- Iterable<IAttribute> sameNameAttributes =
- Iterables.filter(hostAttributes, new NameFilter(constraint.getName()));
- Optional<IAttribute> attribute;
- if (Iterables.isEmpty(sameNameAttributes)) {
- attribute = Optional.absent();
- } else {
- Set<String> attributeValues = ImmutableSet.copyOf(
- Iterables.concat(Iterables.transform(sameNameAttributes, GET_VALUES)));
- attribute =
- Optional.of(IAttribute.build(new Attribute(constraint.getName(), attributeValues)));
- }
-
- ITaskConstraint taskConstraint = constraint.getConstraint();
- switch (taskConstraint.getSetField()) {
- case VALUE:
- boolean matches = AttributeFilter.matches(
- attribute.transform(GET_VALUES).or(ImmutableSet.<String>of()),
- taskConstraint.getValue());
- return matches
- ? Optional.<Veto>absent()
- : Optional.of(mismatchVeto(constraint.getName()));
-
- case LIMIT:
- if (!attribute.isPresent()) {
- return Optional.of(mismatchVeto(constraint.getName()));
- }
-
- boolean satisfied = AttributeFilter.matches(
- attribute.get(),
- taskConstraint.getLimit().getLimit(),
- cachedjobState);
- return satisfied
- ? Optional.<Veto>absent()
- : Optional.of(limitVeto(constraint.getName()));
-
- default:
- throw new SchedulerException("Failed to recognize the constraint type: "
- + taskConstraint.getSetField());
- }
- }
-
- /**
- * A filter to find attributes matching a name.
- */
- static class NameFilter implements Predicate<IAttribute> {
- private final String attributeName;
-
- NameFilter(String attributeName) {
- this.attributeName = attributeName;
- }
-
- @Override
- public boolean apply(IAttribute attribute) {
- return attributeName.equals(attribute.getName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
new file mode 100644
index 0000000..cc8c5b8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.filter;
+
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.entities.IAttribute;
+import org.apache.aurora.scheduler.storage.entities.IConstraint;
+import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
+
+/**
+ * Filter that determines whether a task's constraints are satisfied.
+ */
+final class ConstraintMatcher {
+ private ConstraintMatcher() {
+ // Utility class.
+ }
+
+ @VisibleForTesting
+ static Veto limitVeto(String limit) {
+ return new Veto("Limit not satisfied: " + limit, Veto.MAX_SCORE);
+ }
+
+ @VisibleForTesting
+ static Veto mismatchVeto(String constraint) {
+ return Veto.constraintMismatch("Constraint not satisfied: " + constraint);
+ }
+
+ @VisibleForTesting
+ static Veto maintenanceVeto(String reason) {
+ return new Veto("Host " + reason + " for maintenance", Veto.MAX_SCORE);
+ }
+
+ private static final Function<IAttribute, Set<String>> GET_VALUES =
+ new Function<IAttribute, Set<String>>() {
+ @Override
+ public Set<String> apply(IAttribute attribute) {
+ return attribute.getValues();
+ }
+ };
+
+ /**
+ * Gets the veto (if any) for a scheduling constraint based on the {@link AttributeAggregate} this
+ * filter was created with.
+ *
+ * @param constraint Scheduling filter to check.
+ * @return A veto if the constraint is not satisfied based on the existing state of the job.
+ */
+ static Optional<Veto> getVeto(
+ AttributeAggregate cachedjobState,
+ Iterable<IAttribute> hostAttributes,
+ IConstraint constraint) {
+
+ Iterable<IAttribute> sameNameAttributes =
+ Iterables.filter(hostAttributes, new NameFilter(constraint.getName()));
+ Optional<IAttribute> attribute;
+ if (Iterables.isEmpty(sameNameAttributes)) {
+ attribute = Optional.absent();
+ } else {
+ Set<String> attributeValues = ImmutableSet.copyOf(
+ Iterables.concat(Iterables.transform(sameNameAttributes, GET_VALUES)));
+ attribute =
+ Optional.of(IAttribute.build(new Attribute(constraint.getName(), attributeValues)));
+ }
+
+ ITaskConstraint taskConstraint = constraint.getConstraint();
+ switch (taskConstraint.getSetField()) {
+ case VALUE:
+ boolean matches = AttributeFilter.matches(
+ attribute.transform(GET_VALUES).or(ImmutableSet.<String>of()),
+ taskConstraint.getValue());
+ return matches
+ ? Optional.<Veto>absent()
+ : Optional.of(mismatchVeto(constraint.getName()));
+
+ case LIMIT:
+ if (!attribute.isPresent()) {
+ return Optional.of(mismatchVeto(constraint.getName()));
+ }
+
+ boolean satisfied = AttributeFilter.matches(
+ attribute.get(),
+ taskConstraint.getLimit().getLimit(),
+ cachedjobState);
+ return satisfied
+ ? Optional.<Veto>absent()
+ : Optional.of(limitVeto(constraint.getName()));
+
+ default:
+ throw new SchedulerException("Failed to recognize the constraint type: "
+ + taskConstraint.getSetField());
+ }
+ }
+
+ /**
+ * A filter to find attributes matching a name.
+ */
+ static class NameFilter implements Predicate<IAttribute> {
+ private final String attributeName;
+
+ NameFilter(String attributeName) {
+ this.attributeName = attributeName;
+ }
+
+ @Override
+ public boolean apply(IAttribute attribute) {
+ return attributeName.equals(attribute.getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index c1c5f26..723e7ab 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -19,6 +19,7 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -102,19 +103,101 @@ public interface SchedulingFilter {
}
/**
+ * An available resource in the cluster.
+ */
+ class UnusedResource {
+ private final ResourceSlot offer;
+ private final IHostAttributes attributes;
+
+ public UnusedResource(ResourceSlot offer, IHostAttributes attributes) {
+ this.offer = offer;
+ this.attributes = attributes;
+ }
+
+ public ResourceSlot getResourceSlot() {
+ return offer;
+ }
+
+ public IHostAttributes getAttributes() {
+ return attributes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof UnusedResource)) {
+ return false;
+ }
+
+ UnusedResource other = (UnusedResource) o;
+ return Objects.equals(getResourceSlot(), other.getResourceSlot())
+ && Objects.equals(getAttributes(), other.getAttributes());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(offer, attributes);
+ }
+ }
+
+ /**
+ * A request for resources in the cluster.
+ */
+ class ResourceRequest {
+ private final ITaskConfig task;
+ private final String taskId;
+ private final AttributeAggregate jobState;
+
+ public ResourceRequest(ITaskConfig task, String taskId, AttributeAggregate jobState) {
+ this.task = task;
+ this.taskId = taskId;
+ this.jobState = jobState;
+ }
+
+ public Iterable<IConstraint> getConstraints() {
+ return task.getConstraints();
+ }
+
+ public ResourceSlot getResourceSlot() {
+ return ResourceSlot.from(task);
+ }
+
+ public AttributeAggregate getJobState() {
+ return jobState;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public int getNumRequestedPorts() {
+ return task.getRequestedPorts().size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ResourceRequest)) {
+ return false;
+ }
+
+ ResourceRequest other = (ResourceRequest) o;
+ return Objects.equals(task, other.task)
+ && Objects.equals(getTaskId(), other.getTaskId())
+ && Objects.equals(getJobState(), other.getJobState());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(task, taskId, jobState);
+ }
+ }
+
+ /**
* Applies a task against the filter with the given resources, and on the host.
*
- * @param offer Resources offered.
- * @param task Task.
- * @param taskId Canonical ID of the task.
- * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
+ * @param resource An available resource in the cluster.
+ * @param request A resource request to match against the {@code resource}.
* @return A set of vetoes indicating reasons the task cannot be scheduled. If the task may be
* scheduled, the set will be empty.
*/
- Set<Veto> filter(
- ResourceSlot offer,
- IHostAttributes attributes,
- ITaskConfig task,
- String taskId,
- AttributeAggregate attributeAggregate);
+ Set<Veto> filter(UnusedResource resource, ResourceRequest request);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index cc6b53b..cf3bb64 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -18,7 +18,6 @@ import java.util.EnumSet;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -31,9 +30,9 @@ import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.storage.entities.IAttribute;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
@@ -46,7 +45,6 @@ import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVe
/**
* Implementation of the scheduling filter that ensures resource requirements of tasks are
* fulfilled, and that tasks are allowed to run on the given machine.
- *
*/
public class SchedulingFilterImpl implements SchedulingFilter {
@@ -57,23 +55,6 @@ public class SchedulingFilterImpl implements SchedulingFilter {
private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED);
- /**
- * A function that may veto a task.
- */
- private interface FilterRule extends Function<ITaskConfig, Iterable<Veto>> { }
-
- /**
- * Convenience class for a rule that will only ever have a single veto.
- */
- private abstract static class AbstractSingleVetoRule implements FilterRule {
- @Override
- public final Iterable<Veto> apply(ITaskConfig task) {
- return doApply(task).asSet();
- }
-
- abstract Optional<Veto> doApply(ITaskConfig task);
- }
-
// Scaling ranges to use for comparison of vetos. This has no real bearing besides trying to
// determine if a veto along one resource vector is a 'stronger' veto than that of another vector.
// The values below represent the maximum resources on a typical slave machine.
@@ -115,39 +96,25 @@ public class SchedulingFilterImpl implements SchedulingFilter {
}
}
- private Iterable<FilterRule> rulesFromOffer(final ResourceSlot available) {
- return ImmutableList.<FilterRule>of(
- new AbstractSingleVetoRule() {
- @Override
- public Optional<Veto> doApply(ITaskConfig task) {
- return CPU.maybeVeto(
- available.getNumCpus(),
- ResourceSlot.from(task).getNumCpus());
- }
- },
- new AbstractSingleVetoRule() {
- @Override
- public Optional<Veto> doApply(ITaskConfig task) {
- return RAM.maybeVeto(
- available.getRam().as(Data.MB),
- ResourceSlot.from(task).getRam().as(Data.MB));
- }
- },
- new AbstractSingleVetoRule() {
- @Override
- public Optional<Veto> doApply(ITaskConfig task) {
- return DISK.maybeVeto(available.getDisk().as(Data.MB),
- ResourceSlot.from(task).getDisk().as(Data.MB));
- }
- },
- new AbstractSingleVetoRule() {
- @Override
- public Optional<Veto> doApply(ITaskConfig task) {
- return PORTS.maybeVeto(available.getNumPorts(),
- ResourceSlot.from(task).getNumPorts());
- }
- }
- );
+ private static void maybeAddVeto(
+ ImmutableList.Builder<Veto> vetoes,
+ ResourceVector vector,
+ double available,
+ double requested) {
+
+ Optional<Veto> veto = vector.maybeVeto(available, requested);
+ if (veto.isPresent()) {
+ vetoes.add(veto.get());
+ }
+ }
+
+ private static Iterable<Veto> getResourceVetoes(ResourceSlot available, ResourceSlot required) {
+ ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
+ maybeAddVeto(vetoes, CPU, available.getNumCpus(), required.getNumCpus());
+ maybeAddVeto(vetoes, RAM, available.getRam().as(Data.MB), required.getRam().as(Data.MB));
+ maybeAddVeto(vetoes, DISK, available.getDisk().as(Data.MB), required.getDisk().as(Data.MB));
+ maybeAddVeto(vetoes, PORTS, available.getNumPorts(), required.getNumPorts());
+ return vetoes.build();
}
private static boolean isValueConstraint(IConstraint constraint) {
@@ -165,78 +132,58 @@ public class SchedulingFilterImpl implements SchedulingFilter {
}
});
- private FilterRule getConstraintFilter(
- final AttributeAggregate jobState,
- final IHostAttributes offerAttributes) {
-
- return new FilterRule() {
- @Override
- public Iterable<Veto> apply(final ITaskConfig task) {
- if (!task.isSetConstraints()) {
- return ImmutableList.of();
- }
-
- ConstraintFilter constraintFilter = new ConstraintFilter(
- jobState,
- offerAttributes.getAttributes());
- ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
- for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
- Optional<Veto> veto = constraintFilter.getVeto(constraint);
- if (veto.isPresent()) {
- vetoes.add(veto.get());
- if (isValueConstraint(constraint)) {
- // Break when a value constraint mismatch is found to avoid other
- // potentially-expensive operations to satisfy other constraints.
- break;
- }
- }
+ private Iterable<Veto> getConstraintVetoes(
+ Iterable<IConstraint> taskConstraints,
+ AttributeAggregate jobState,
+ Iterable<IAttribute> offerAttributes) {
+
+ ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
+ for (IConstraint constraint : VALUES_FIRST.sortedCopy(taskConstraints)) {
+ Optional<Veto> veto = ConstraintMatcher.getVeto(jobState, offerAttributes, constraint);
+ if (veto.isPresent()) {
+ vetoes.add(veto.get());
+ if (isValueConstraint(constraint)) {
+ // Break when a value constraint mismatch is found to avoid other
+ // potentially-expensive operations to satisfy other constraints.
+ break;
}
-
- return vetoes.build();
}
- };
+ }
+
+ return vetoes.build();
}
private Optional<Veto> getMaintenanceVeto(MaintenanceMode mode) {
return VETO_MODES.contains(mode)
- ? Optional.of(ConstraintFilter.maintenanceVeto(mode.toString().toLowerCase()))
+ ? Optional.of(ConstraintMatcher.maintenanceVeto(mode.toString().toLowerCase()))
: NO_VETO;
}
- private Set<Veto> getResourceVetoes(ResourceSlot offer, ITaskConfig task) {
- ImmutableSet.Builder<Veto> builder = ImmutableSet.builder();
- for (FilterRule rule : rulesFromOffer(offer)) {
- builder.addAll(rule.apply(task));
- }
- return builder.build();
- }
-
private boolean isDedicated(IHostAttributes attributes) {
return Iterables.any(
attributes.getAttributes(),
- new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
+ new ConstraintMatcher.NameFilter(DEDICATED_ATTRIBUTE));
}
@Override
- public Set<Veto> filter(
- ResourceSlot offer,
- IHostAttributes attributes,
- ITaskConfig task,
- String taskId,
- AttributeAggregate attributeAggregate) {
-
- if (!ConfigurationManager.isDedicated(task) && isDedicated(attributes)) {
+ public Set<Veto> filter(UnusedResource resource, ResourceRequest request) {
+ if (!ConfigurationManager.isDedicated(request.getConstraints())
+ && isDedicated(resource.getAttributes())) {
+
return ImmutableSet.of(DEDICATED_HOST_VETO);
}
- Optional<Veto> maintenanceVeto = getMaintenanceVeto(attributes.getMode());
+ Optional<Veto> maintenanceVeto = getMaintenanceVeto(resource.getAttributes().getMode());
if (maintenanceVeto.isPresent()) {
return maintenanceVeto.asSet();
}
return ImmutableSet.<Veto>builder()
- .addAll(getConstraintFilter(attributeAggregate, attributes).apply(task))
- .addAll(getResourceVetoes(offer, task))
+ .addAll(getConstraintVetoes(
+ request.getConstraints(),
+ request.getJobState(),
+ resource.getAttributes().getAttributes()))
+ .addAll(getResourceVetoes(resource.getResourceSlot(), request.getResourceSlot()))
.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 4abc7ba..e1c2974 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -22,14 +22,13 @@ import com.google.common.base.Optional;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos.TaskInfo;
import static java.util.Objects.requireNonNull;
@@ -48,15 +47,13 @@ public interface TaskAssigner {
*
* @param storeProvider Storage provider.
* @param offer The resource offer.
- * @param task The task to match against and optionally assign.
- * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
+ * @param resourceRequest The request for resources being scheduled.
* @return Instructions for launching the task if matching and assignment were successful.
*/
Optional<TaskInfo> maybeAssign(
MutableStoreProvider storeProvider,
HostOffer offer,
- IScheduledTask task,
- AttributeAggregate attributeAggregate);
+ ResourceRequest resourceRequest);
class TaskAssignerImpl implements TaskAssigner {
private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
@@ -76,18 +73,22 @@ public interface TaskAssigner {
this.taskFactory = requireNonNull(taskFactory);
}
- private TaskInfo assign(MutableStoreProvider storeProvider, Offer offer, IScheduledTask task) {
+ private TaskInfo assign(
+ MutableStoreProvider storeProvider,
+ Offer offer,
+ int numRequestedPorts,
+ String taskId) {
+
String host = offer.getHostname();
- Set<Integer> selectedPorts =
- Resources.getPorts(offer, task.getAssignedTask().getTask().getRequestedPorts().size());
+ Set<Integer> selectedPorts = Resources.getPorts(offer, numRequestedPorts);
IAssignedTask assigned = stateManager.assignTask(
storeProvider,
- Tasks.id(task),
+ taskId,
host,
offer.getSlaveId(),
selectedPorts);
LOG.info(String.format("Offer on slave %s (id %s) is being assigned task for %s.",
- host, offer.getSlaveId().getValue(), Tasks.id(task)));
+ host, offer.getSlaveId().getValue(), taskId));
return taskFactory.createFrom(assigned, offer.getSlaveId());
}
@@ -95,20 +96,20 @@ public interface TaskAssigner {
public Optional<TaskInfo> maybeAssign(
MutableStoreProvider storeProvider,
HostOffer offer,
- IScheduledTask task,
- AttributeAggregate attributeAggregate) {
+ ResourceRequest resourceRequest) {
Set<Veto> vetoes = filter.filter(
- ResourceSlot.from(offer.getOffer()),
- offer.getAttributes(),
- task.getAssignedTask().getTask(),
- Tasks.id(task),
- attributeAggregate);
+ new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
+ resourceRequest);
if (vetoes.isEmpty()) {
- return Optional.of(assign(storeProvider, offer.getOffer(), task));
+ return Optional.of(assign(
+ storeProvider,
+ offer.getOffer(),
+ resourceRequest.getNumRequestedPorts(),
+ resourceRequest.getTaskId()));
} else {
- LOG.fine("Slave " + offer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
- + ": " + vetoes);
+ LOG.fine("Slave " + offer.getOffer().getHostname()
+ + " vetoed task " + resourceRequest.getTaskId() + ": " + vetoes);
return Optional.absent();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
index 79d12b0..c30a8c9 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
@@ -136,7 +136,7 @@ public class ResourceCounter {
DEDICATED_CONSUMED(new Predicate<ITaskConfig>() {
@Override
public boolean apply(ITaskConfig task) {
- return ConfigurationManager.isDedicated(task);
+ return ConfigurationManager.isDedicated(task.getConstraints());
}
}),
QUOTA_CONSUMED(new Predicate<ITaskConfig>() {
@@ -148,7 +148,7 @@ public class ResourceCounter {
FREE_POOL_CONSUMED(new Predicate<ITaskConfig>() {
@Override
public boolean apply(ITaskConfig task) {
- return !ConfigurationManager.isDedicated(task) && !task.isProduction();
+ return !ConfigurationManager.isDedicated(task.getConstraints()) && !task.isProduction();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index 8b0367e..69108cf 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -44,12 +44,13 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -58,7 +59,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
@@ -571,18 +571,15 @@ public class PreemptorImplTest extends EasyMockTest {
private IExpectationSetters<Set<Veto>> expectFiltering() {
return expect(schedulingFilter.filter(
- EasyMock.<ResourceSlot>anyObject(),
- EasyMock.<IHostAttributes>anyObject(),
- EasyMock.<ITaskConfig>anyObject(),
- EasyMock.<String>anyObject(),
- EasyMock.eq(emptyJob))).andAnswer(
- new IAnswer<Set<Veto>>() {
- @Override
- public Set<Veto> answer() {
- return ImmutableSet.of();
- }
- }
- );
+ EasyMock.<UnusedResource>anyObject(),
+ EasyMock.<ResourceRequest>anyObject()))
+ .andAnswer(
+ new IAnswer<Set<Veto>>() {
+ @Override
+ public Set<Veto> answer() {
+ return ImmutableSet.of();
+ }
+ });
}
private void expectPreempted(ScheduledTask preempted) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 9bc6a75..17f2d77 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -43,6 +43,7 @@ import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.state.PubsubTestUtil;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
@@ -134,7 +135,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
private void expectAssigned(IScheduledTask task) {
- expect(assigner.maybeAssign(storageUtil.mutableStoreProvider, OFFER, task, emptyJob))
+ expect(assigner.maybeAssign(
+ storageUtil.mutableStoreProvider,
+ OFFER,
+ new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), emptyJob)))
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
}
@@ -194,7 +198,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
- expect(assigner.maybeAssign(storageUtil.mutableStoreProvider, OFFER, TASK_B, emptyJob))
+ expect(assigner.maybeAssign(
+ storageUtil.mutableStoreProvider,
+ OFFER,
+ new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), emptyJob)))
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
control.replay();
@@ -307,8 +314,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expect(assigner.maybeAssign(
EasyMock.<MutableStoreProvider>anyObject(),
eq(OFFER),
- eq(taskA),
- eq(emptyJob)))
+ eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), emptyJob))))
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
control.replay();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index d1bc9bf..012804a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -51,6 +51,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateManager;
@@ -298,8 +299,7 @@ public class TaskSchedulerTest extends EasyMockTest {
return expect(assigner.maybeAssign(
EasyMock.<MutableStoreProvider>anyObject(),
eq(offer),
- eq(task),
- eq(jobAggregate)));
+ eq(new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), jobAggregate))));
}
@Test
@@ -524,17 +524,16 @@ public class TaskSchedulerTest extends EasyMockTest {
captureB.getValue().run();
}
- private Capture<IScheduledTask> expectTaskScheduled(IScheduledTask task) {
+ private Capture<ResourceRequest> expectTaskScheduled(IScheduledTask task) {
TaskInfo mesosTask = makeTaskInfo(task);
- Capture<IScheduledTask> taskScheduled = createCapture();
+ Capture<ResourceRequest> request = createCapture();
expect(assigner.maybeAssign(
EasyMock.<MutableStoreProvider>anyObject(),
EasyMock.<HostOffer>anyObject(),
- capture(taskScheduled),
- EasyMock.eq(emptyJob)))
+ capture(request)))
.andReturn(Optional.of(mesosTask));
driver.launchTask(EasyMock.<OfferID>anyObject(), eq(mesosTask));
- return taskScheduled;
+ return request;
}
@Test
@@ -566,8 +565,8 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
Capture<Runnable> timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- Capture<IScheduledTask> firstScheduled = expectTaskScheduled(jobA0);
- Capture<IScheduledTask> secondScheduled = expectTaskScheduled(jobB0);
+ Capture<ResourceRequest> firstScheduled = expectTaskScheduled(jobA0);
+ Capture<ResourceRequest> secondScheduled = expectTaskScheduled(jobB0);
// Expect another watch of the task group for job A.
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -585,8 +584,10 @@ public class TaskSchedulerTest extends EasyMockTest {
timeoutA.getValue().run();
timeoutB.getValue().run();
assertEquals(
- ImmutableSet.of(jobA0, jobB0),
- ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue()));
+ ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)),
+ ImmutableSet.of(
+ firstScheduled.getValue().getTaskId(),
+ secondScheduled.getValue().getTaskId()));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 94f0a17..d408ec0 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -26,6 +26,8 @@ import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -43,10 +45,11 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
.setNumCpus(1)
.setRamMb(1024)
.setDiskMb(1024));
- private static final ResourceSlot TASK_RESOURCES = ResourceSlot.from(TASK);
private static final String TASK_ID = "taskId";
- private static final IHostAttributes ATTRIBUTES =
- IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE));
+ private static final UnusedResource RESOURCE = new UnusedResource(
+ ResourceSlot.from(TASK),
+ IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
+ private ResourceRequest request;
private static final Veto VETO_1 = new Veto("veto1", 1);
private static final Veto VETO_2 = new Veto("veto2", 2);
@@ -54,36 +57,36 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
private SchedulingFilter filter;
private EventSink eventSink;
private SchedulingFilter delegate;
- private AttributeAggregate emptyJob;
@Before
public void setUp() {
delegate = createMock(SchedulingFilter.class);
eventSink = createMock(EventSink.class);
filter = new NotifyingSchedulingFilter(delegate, eventSink);
- emptyJob = new AttributeAggregate(
+ AttributeAggregate emptyJob = new AttributeAggregate(
Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
createMock(AttributeStore.class));
+ request = new ResourceRequest(TASK, "taskId", emptyJob);
}
@Test
public void testEvents() {
Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
- expect(delegate.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+ expect(delegate.filter(RESOURCE, request)).andReturn(vetoes);
eventSink.post(new Vetoed(TASK_ID, vetoes));
control.replay();
- assertEquals(vetoes, filter.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob));
+ assertEquals(vetoes, filter.filter(RESOURCE, request));
}
@Test
public void testNoVetoes() {
Set<Veto> vetoes = ImmutableSet.of();
- expect(delegate.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+ expect(delegate.filter(RESOURCE, request)).andReturn(vetoes);
control.replay();
- assertEquals(vetoes, filter.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob));
+ assertEquals(vetoes, filter.filter(RESOURCE, request));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index e113eba..d1a7066 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -39,6 +39,8 @@ import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.gen.ValueConstraint;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -54,8 +56,8 @@ import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
-import static org.apache.aurora.scheduler.filter.ConstraintFilter.limitVeto;
-import static org.apache.aurora.scheduler.filter.ConstraintFilter.mismatchVeto;
+import static org.apache.aurora.scheduler.filter.ConstraintMatcher.limitVeto;
+import static org.apache.aurora.scheduler.filter.ConstraintMatcher.mismatchVeto;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.DEDICATED_HOST_VETO;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.CPU;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
@@ -63,7 +65,6 @@ import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVe
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class SchedulingFilterImplTest extends EasyMockTest {
@@ -171,15 +172,26 @@ public class SchedulingFilterImplTest extends EasyMockTest {
Set<Veto> none = ImmutableSet.of();
IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
- assertEquals(none,
- defaultFilter.filter(twoPorts, hostA, noPortTask, TASK_ID, emptyJob));
- assertEquals(none,
- defaultFilter.filter(twoPorts, hostA, onePortTask, TASK_ID, emptyJob));
- assertEquals(none,
- defaultFilter.filter(twoPorts, hostA, twoPortTask, TASK_ID, emptyJob));
+ assertEquals(
+ none,
+ defaultFilter.filter(
+ new UnusedResource(twoPorts, hostA),
+ new ResourceRequest(noPortTask, TASK_ID, emptyJob)));
+ assertEquals(
+ none,
+ defaultFilter.filter(
+ new UnusedResource(twoPorts, hostA),
+ new ResourceRequest(onePortTask, TASK_ID, emptyJob)));
+ assertEquals(
+ none,
+ defaultFilter.filter(
+ new UnusedResource(twoPorts, hostA),
+ new ResourceRequest(twoPortTask, TASK_ID, emptyJob)));
assertEquals(
ImmutableSet.of(PORTS.veto(1)),
- defaultFilter.filter(twoPorts, hostA, threePortTask, TASK_ID, emptyJob));
+ defaultFilter.filter(
+ new UnusedResource(twoPorts, hostA),
+ new ResourceRequest(threePortTask, TASK_ID, emptyJob)));
}
@Test
@@ -260,7 +272,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertVetoes(makeTask(),
hostAttributes(HOST_A, MaintenanceMode.DRAINING, host(HOST_A), rack(RACK_A)),
- ConstraintFilter.maintenanceVeto("draining"));
+ ConstraintMatcher.maintenanceVeto("draining"));
}
@Test
@@ -270,7 +282,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertVetoes(
makeTask(),
hostAttributes(HOST_A, MaintenanceMode.DRAINED, host(HOST_A), rack(RACK_A)),
- ConstraintFilter.maintenanceVeto("drained"));
+ ConstraintMatcher.maintenanceVeto("drained"));
}
@Test
@@ -435,8 +447,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
Constraint zoneConstraint = makeConstraint("zone", "c");
ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
- assertTrue(
- defaultFilter.filter(DEFAULT_OFFER, hostA, task, TASK_ID, emptyJob).isEmpty());
+ assertEquals(
+ ImmutableSet.<Veto>of(),
+ defaultFilter.filter(
+ new UnusedResource(DEFAULT_OFFER, hostA),
+ new ResourceRequest(task, TASK_ID, emptyJob)));
Constraint jvmNegated = jvmConstraint.deepCopy();
jvmNegated.getConstraint().getValue().setNegated(true);
@@ -524,11 +539,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertEquals(
expected,
defaultFilter.filter(
- DEFAULT_OFFER,
- hostAttributes,
- task,
- TASK_ID,
- aggregate).isEmpty());
+ new UnusedResource(DEFAULT_OFFER, hostAttributes),
+ new ResourceRequest(task, TASK_ID, aggregate))
+ .isEmpty());
Constraint negated = constraint.deepCopy();
negated.getConstraint().getValue().setNegated(!value.isNegated());
@@ -536,11 +549,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertEquals(
!expected,
defaultFilter.filter(
- DEFAULT_OFFER,
- hostAttributes,
- negatedTask,
- TASK_ID,
- aggregate).isEmpty());
+ new UnusedResource(DEFAULT_OFFER, hostAttributes),
+ new ResourceRequest(negatedTask, TASK_ID, aggregate))
+ .isEmpty());
return task;
}
@@ -568,7 +579,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertEquals(
ImmutableSet.copyOf(vetoes),
- defaultFilter.filter(DEFAULT_OFFER, hostAttributes, task, TASK_ID, jobState));
+ defaultFilter.filter(
+ new UnusedResource(DEFAULT_OFFER, hostAttributes),
+ new ResourceRequest(task, TASK_ID, jobState)));
}
private static IHostAttributes hostAttributes(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index a8a70b6..411a55a 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -28,6 +28,8 @@ import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
@@ -102,11 +104,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
@Test
public void testAssignNoVetoes() {
expect(filter.filter(
- ResourceSlot.from(MESOS_OFFER),
- OFFER.getAttributes(),
- TASK.getAssignedTask().getTask(),
- Tasks.id(TASK),
- emptyJob))
+ new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
+ new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)))
.andReturn(ImmutableSet.<Veto>of());
expect(stateManager.assignTask(
storeProvider,
@@ -122,23 +121,26 @@ public class TaskAssignerImplTest extends EasyMockTest {
assertEquals(
Optional.of(TASK_INFO),
- assigner.maybeAssign(storeProvider, OFFER, TASK, emptyJob));
+ assigner.maybeAssign(
+ storeProvider,
+ OFFER,
+ new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)));
}
@Test
public void testAssignVetoes() {
expect(filter.filter(
- ResourceSlot.from(MESOS_OFFER),
- OFFER.getAttributes(),
- TASK.getAssignedTask().getTask(),
- Tasks.id(TASK),
- emptyJob))
+ new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
+ new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)))
.andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
control.replay();
assertEquals(
Optional.<TaskInfo>absent(),
- assigner.maybeAssign(storeProvider, OFFER, TASK, emptyJob));
+ assigner.maybeAssign(
+ storeProvider,
+ OFFER,
+ new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)));
}
}