You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/09/27 19:27:18 UTC
aurora git commit: Scheduling multiple tasks per round.
Repository: aurora
Updated Branches:
refs/heads/master 60e5e4e67 -> f559e9306
Scheduling multiple tasks per round.
Reviewed at https://reviews.apache.org/r/51929/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f559e930
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f559e930
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f559e930
Branch: refs/heads/master
Commit: f559e930659e25b3d7cacb7b845ebda50d18d66a
Parents: 60e5e4e
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Tue Sep 27 12:27:04 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Sep 27 12:27:04 2016 -0700
----------------------------------------------------------------------
.../aurora/benchmark/SchedulingBenchmarks.java | 19 +-
.../scheduler/filter/AttributeAggregate.java | 41 ++--
.../scheduler/scheduling/SchedulingModule.java | 8 +-
.../aurora/scheduler/scheduling/TaskGroup.java | 12 +-
.../aurora/scheduler/scheduling/TaskGroups.java | 58 +++---
.../scheduler/scheduling/TaskScheduler.java | 127 +++++++-----
.../aurora/scheduler/state/TaskAssigner.java | 41 ++--
.../events/NotifyingSchedulingFilterTest.java | 2 +-
.../filter/AttributeAggregateTest.java | 32 +++
.../filter/SchedulingFilterImplTest.java | 18 +-
.../scheduler/http/AbstractJettyTest.java | 3 +-
.../preemptor/PreemptionVictimFilterTest.java | 4 +-
.../scheduler/preemptor/PreemptorImplTest.java | 6 +-
.../preemptor/PreemptorModuleTest.java | 2 +-
.../scheduler/scheduling/TaskGroupsTest.java | 55 ++++--
.../scheduling/TaskSchedulerImplTest.java | 111 ++++++++---
.../scheduler/state/TaskAssignerImplTest.java | 197 +++++++++++--------
17 files changed, 475 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 6f1cbfb..1b56500 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
+import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.EventBus;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -192,7 +193,9 @@ public class SchedulingBenchmarks {
saveTasks(tasksToAssign);
storage.write((NoResult.Quiet) store -> {
for (IScheduledTask scheduledTask : tasksToAssign) {
- taskScheduler.schedule(store, scheduledTask.getAssignedTask().getTaskId());
+ taskScheduler.schedule(
+ store,
+ ImmutableSet.of(scheduledTask.getAssignedTask().getTaskId()));
}
});
}
@@ -220,11 +223,13 @@ public class SchedulingBenchmarks {
* See {@see http://openjdk.java.net/projects/code-tools/jmh/} for more info.
*/
@Benchmark
- public boolean runBenchmark() {
- return storage.write((Storage.MutateWork.Quiet<Boolean>) store -> {
- boolean result = false;
+ public Set<String> runBenchmark() {
+ return storage.write((Storage.MutateWork.Quiet<Set<String>>) store -> {
+ Set<String> result = null;
for (IScheduledTask task : settings.getTasks()) {
- result = taskScheduler.schedule(store, task.getAssignedTask().getTaskId());
+ result = taskScheduler.schedule(
+ store,
+ ImmutableSet.of(task.getAssignedTask().getTaskId()));
}
return result;
});
@@ -313,10 +318,10 @@ public class SchedulingBenchmarks {
}
@Override
- public boolean runBenchmark() {
+ public Set<String> runBenchmark() {
pendingTaskProcessor.run();
// Return non-guessable result to satisfy "blackhole" requirement.
- return System.currentTimeMillis() % 5 == 0;
+ return ImmutableSet.of("" + System.currentTimeMillis());
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index 87b9e19..f04149e 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -27,6 +27,7 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IAttribute;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -46,7 +47,7 @@ public final class AttributeAggregate {
* A mapping from attribute name and value to the count of tasks with that name/value combination.
* See doc for {@link #getNumTasksWithAttribute(String, String)} for further details.
*/
- private final Supplier<Multiset<Pair<String, String>>> aggregate;
+ private Supplier<Multiset<Pair<String, String>>> aggregate;
private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) {
this.aggregate = Suppliers.memoize(aggregate);
@@ -92,25 +93,35 @@ public final class AttributeAggregate {
@VisibleForTesting
static AttributeAggregate create(Supplier<Iterable<IAttribute>> attributes) {
Supplier<Multiset<Pair<String, String>>> aggregator = Suppliers.compose(
- attributes1 -> {
- ImmutableMultiset.Builder<Pair<String, String>> builder = ImmutableMultiset.builder();
- for (IAttribute attribute : attributes1) {
- for (String value : attribute.getValues()) {
- builder.add(Pair.of(attribute.getName(), value));
- }
- }
-
- return builder.build();
- },
- attributes
- );
+ attributes1 -> addAttributes(ImmutableMultiset.builder(), attributes1).build(),
+ attributes);
return new AttributeAggregate(aggregator);
}
+ static ImmutableMultiset.Builder<Pair<String, String>> addAttributes(
+ ImmutableMultiset.Builder<Pair<String, String>> builder,
+ Iterable<IAttribute> attributes) {
+
+ for (IAttribute attribute : attributes) {
+ for (String value : attribute.getValues()) {
+ builder.add(Pair.of(attribute.getName(), value));
+ }
+ }
+ return builder;
+ }
+
+ public void updateAttributeAggregate(IHostAttributes attributes) {
+ ImmutableMultiset.Builder<Pair<String, String>> builder = new ImmutableMultiset.Builder<>();
+ builder.addAll(aggregate.get());
+ addAttributes(builder, attributes.getAttributes());
+ aggregate = Suppliers.memoize(() -> builder.build());
+ }
+
@VisibleForTesting
- public static final AttributeAggregate EMPTY =
- new AttributeAggregate(Suppliers.ofInstance(ImmutableMultiset.of()));
+ public static AttributeAggregate empty() {
+ return new AttributeAggregate(Suppliers.ofInstance(ImmutableMultiset.of()));
+ }
/**
* Gets the total number of tasks with a given attribute name and value combination.
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index 664bc6c..03a0e84 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -90,6 +90,11 @@ public class SchedulingModule extends AbstractModule {
help = "The maximum number of scheduling attempts that can be processed in a batch.")
private static final Arg<Integer> SCHEDULING_MAX_BATCH_SIZE = Arg.create(3);
+ @Positive
+ @CmdLine(name = "max_tasks_per_schedule_attempt",
+ help = "The maximum number of tasks to pick in a single scheduling attempt.")
+ private static final Arg<Integer> MAX_TASKS_PER_SCHEDULE_ATTEMPT = Arg.create(5);
+
@Override
protected void configure() {
install(new PrivateModule() {
@@ -100,7 +105,8 @@ public class SchedulingModule extends AbstractModule {
new TruncatedBinaryBackoff(
INITIAL_SCHEDULE_PENALTY.get(),
MAX_SCHEDULE_PENALTY.get()),
- RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
+ RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get()),
+ MAX_TASKS_PER_SCHEDULE_ATTEMPT.get()));
bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
.toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
index 5d31955..b521620 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
@@ -13,15 +13,17 @@
*/
package org.apache.aurora.scheduler.scheduling;
+import java.util.Collection;
import java.util.Queue;
import java.util.Set;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.aurora.scheduler.base.TaskGroupKey;
+import static org.apache.aurora.GuavaUtils.toImmutableSet;
+
/**
* A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
*/
@@ -41,16 +43,16 @@ class TaskGroup {
return key;
}
- synchronized Optional<String> peek() {
- return Optional.fromNullable(tasks.peek());
+ synchronized Set<String> peek(int maxTasks) {
+ return tasks.stream().limit(Math.min(tasks.size(), maxTasks)).collect(toImmutableSet());
}
synchronized boolean hasMore() {
return !tasks.isEmpty();
}
- synchronized void remove(String taskId) {
- tasks.remove(taskId);
+ synchronized void remove(Collection<String> taskIdsToRemove) {
+ tasks.removeAll(taskIdsToRemove);
}
synchronized void offer(String taskId) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index d390c07..77187bc 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.scheduling;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -23,7 +24,6 @@ import javax.inject.Inject;
import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
@@ -71,11 +71,10 @@ public class TaskGroups implements EventSubscriber {
private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap();
private final DelayExecutor executor;
+ private final TaskGroupsSettings settings;
private final TaskScheduler taskScheduler;
- private final long firstScheduleDelay;
- private final BackoffStrategy backoff;
private final RescheduleCalculator rescheduleCalculator;
- private final BatchWorker<Boolean> batchWorker;
+ private final BatchWorker<Set<String>> batchWorker;
// Track the penalties of tasks at the time they were scheduled. This is to provide data that
// may influence the selection of a different backoff strategy.
@@ -91,7 +90,7 @@ public class TaskGroups implements EventSubscriber {
public @interface SchedulingMaxBatchSize { }
@VisibleForTesting
- public static class TaskGroupBatchWorker extends BatchWorker<Boolean> {
+ public static class TaskGroupBatchWorker extends BatchWorker<Set<String>> {
@Inject
TaskGroupBatchWorker(
Storage storage,
@@ -111,15 +110,20 @@ public class TaskGroups implements EventSubscriber {
private final Amount<Long, Time> firstScheduleDelay;
private final BackoffStrategy taskGroupBackoff;
private final RateLimiter rateLimiter;
+ private final int maxTasksPerSchedule;
public TaskGroupsSettings(
Amount<Long, Time> firstScheduleDelay,
BackoffStrategy taskGroupBackoff,
- RateLimiter rateLimiter) {
+ RateLimiter rateLimiter,
+ int maxTasksPerSchedule) {
this.firstScheduleDelay = requireNonNull(firstScheduleDelay);
+ Preconditions.checkArgument(firstScheduleDelay.getValue() > 0);
this.taskGroupBackoff = requireNonNull(taskGroupBackoff);
this.rateLimiter = requireNonNull(rateLimiter);
+ this.maxTasksPerSchedule = maxTasksPerSchedule;
+ Preconditions.checkArgument(maxTasksPerSchedule > 0);
}
}
@@ -131,21 +135,11 @@ public class TaskGroups implements EventSubscriber {
RescheduleCalculator rescheduleCalculator,
TaskGroupBatchWorker batchWorker) {
- requireNonNull(settings.firstScheduleDelay);
- Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0);
-
this.executor = requireNonNull(executor);
- requireNonNull(settings.rateLimiter);
- requireNonNull(taskScheduler);
- this.firstScheduleDelay = settings.firstScheduleDelay.as(Time.MILLISECONDS);
- this.backoff = requireNonNull(settings.taskGroupBackoff);
+ this.settings = requireNonNull(settings);
+ this.taskScheduler = requireNonNull(taskScheduler);
this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
this.batchWorker = requireNonNull(batchWorker);
-
- this.taskScheduler = (store, taskId) -> {
- settings.rateLimiter.acquire();
- return taskScheduler.schedule(store, taskId);
- };
}
private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) {
@@ -162,27 +156,29 @@ public class TaskGroups implements EventSubscriber {
Runnable monitor = new Runnable() {
@Override
public void run() {
- final Optional<String> taskId = group.peek();
+ final Set<String> taskIds = group.peek(settings.maxTasksPerSchedule);
long penaltyMs = 0;
- if (taskId.isPresent()) {
- CompletableFuture<Boolean> result = batchWorker.execute(storeProvider ->
- taskScheduler.schedule(storeProvider, taskId.get()));
- boolean isScheduled = false;
+ if (!taskIds.isEmpty()) {
+ settings.rateLimiter.acquire();
+ CompletableFuture<Set<String>> result = batchWorker.execute(storeProvider ->
+ taskScheduler.schedule(storeProvider, taskIds));
+
+ Set<String> scheduled = null;
try {
- isScheduled = result.get();
+ scheduled = result.get();
} catch (ExecutionException | InterruptedException e) {
Thread.currentThread().interrupt();
Throwables.propagate(e);
}
- if (isScheduled) {
+ if (scheduled.isEmpty()) {
+ penaltyMs = settings.taskGroupBackoff.calculateBackoffMs(group.getPenaltyMs());
+ } else {
scheduledTaskPenalties.accumulate(group.getPenaltyMs());
- group.remove(taskId.get());
+ group.remove(scheduled);
if (group.hasMore()) {
- penaltyMs = firstScheduleDelay;
+ penaltyMs = settings.firstScheduleDelay.as(Time.MILLISECONDS);
}
- } else {
- penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs());
}
}
@@ -211,7 +207,7 @@ public class TaskGroups implements EventSubscriber {
if (existing == null) {
long penaltyMs;
if (stateChange.isTransition()) {
- penaltyMs = firstScheduleDelay;
+ penaltyMs = settings.firstScheduleDelay.as(Time.MILLISECONDS);
} else {
penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task);
}
@@ -234,7 +230,7 @@ public class TaskGroups implements EventSubscriber {
: Iterables.transform(deleted.getTasks(), IScheduledTask::getAssignedTask)) {
TaskGroup group = groups.get(TaskGroupKey.from(task.getTask()));
if (group != null) {
- group.remove(task.getTaskId());
+ group.remove(ImmutableSet.of(task.getTaskId()));
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 207d38d..31edb1d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -15,14 +15,21 @@ package org.apache.aurora.scheduler.scheduling;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
@@ -51,6 +58,8 @@ import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toMap;
+
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
@@ -63,11 +72,11 @@ public interface TaskScheduler extends EventSubscriber {
* Attempts to schedule a task, possibly performing irreversible actions.
*
* @param storeProvider {@code MutableStoreProvider} instance to access data store.
- * @param taskId The task to attempt to schedule.
- * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should
- * call schedule again if {@code false} is returned.
+ * @param taskIds The tasks to attempt to schedule.
+ * @return Successfully scheduled task IDs. The caller should call schedule again if a given
+ * task ID was not present in the result.
*/
- boolean schedule(MutableStoreProvider storeProvider, String taskId);
+ Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds);
/**
* An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task
@@ -109,62 +118,84 @@ public interface TaskScheduler extends EventSubscriber {
}
@Timed ("task_schedule_attempt")
- public boolean schedule(MutableStoreProvider store, String taskId) {
- attemptsFired.incrementAndGet();
+ public Set<String> schedule(MutableStoreProvider store, Iterable<String> taskIds) {
try {
- return scheduleTask(store, taskId);
+ return scheduleTasks(store, taskIds);
} catch (RuntimeException e) {
// We catch the generic unchecked exception here to ensure tasks are not abandoned
// if there is a transient issue resulting in an unchecked exception.
LOG.warn("Task scheduling unexpectedly failed, will be retried", e);
attemptsFailed.incrementAndGet();
- return false;
+ // Return empty set for all task IDs to be retried later.
+ // It's ok if some tasks were already assigned, those will be ignored in the next round.
+ return ImmutableSet.of();
}
}
- private boolean scheduleTask(MutableStoreProvider store, String taskId) {
- LOG.debug("Attempting to schedule task " + taskId);
- IAssignedTask assignedTask = Iterables.getOnlyElement(
- Iterables.transform(
- store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
- IScheduledTask::getAssignedTask),
- null);
-
- if (assignedTask == null) {
- LOG.warn("Failed to look up task " + taskId + ", it may have been deleted.");
- } else {
- ITaskConfig task = assignedTask.getTask();
- AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
-
- // Valid Docker tasks can have a container but no executor config
- ResourceBag overhead = ResourceBag.EMPTY;
- if (task.isSetExecutorConfig()) {
- overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
- .orElseThrow(
- () -> new IllegalArgumentException("Cannot find executor configuration"));
- }
+ private Set<String> scheduleTasks(MutableStoreProvider store, Iterable<String> tasks) {
+ ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
+ String taskIdValues = Joiner.on(",").join(taskIds);
+ LOG.debug("Attempting to schedule tasks " + taskIdValues);
+ ImmutableSet<IAssignedTask> assignedTasks =
+ ImmutableSet.copyOf(Iterables.transform(
+ store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
+ IScheduledTask::getAssignedTask));
+
+ if (Iterables.isEmpty(assignedTasks)) {
+ LOG.warn("Failed to look up all tasks in a scheduling round: " + taskIdValues);
+ return taskIds;
+ }
- boolean launched = assigner.maybeAssign(
- store,
- new ResourceRequest(
- task,
- bagFromResources(task.getResources()).add(overhead), aggregate),
- TaskGroupKey.from(task),
- taskId,
- reservations.asMap());
-
- if (!launched) {
- // Task could not be scheduled.
- // TODO(maxim): Now that preemption slots are searched asynchronously, consider
- // retrying a launch attempt within the current scheduling round IFF a reservation is
- // available.
- maybePreemptFor(assignedTask, aggregate, store);
- attemptsNoMatch.incrementAndGet();
- return false;
- }
+ Preconditions.checkState(
+ assignedTasks.stream()
+ .collect(Collectors.groupingBy(t -> t.getTask()))
+ .entrySet()
+ .size() == 1,
+ "Found multiple task groups for " + taskIdValues);
+
+ Map<String, IAssignedTask> assignableTaskMap =
+ assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+
+ if (taskIds.size() != assignedTasks.size()) {
+ LOG.warn("Failed to look up tasks "
+ + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
+ }
+
+ // This is safe after all checks above.
+ ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+ AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
+
+ // Valid Docker tasks can have a container but no executor config
+ ResourceBag overhead = ResourceBag.EMPTY;
+ if (task.isSetExecutorConfig()) {
+ overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
+ .orElseThrow(
+ () -> new IllegalArgumentException("Cannot find executor configuration"));
}
- return true;
+ Set<String> launched = assigner.maybeAssign(
+ store,
+ new ResourceRequest(
+ task,
+ bagFromResources(task.getResources()).add(overhead), aggregate),
+ TaskGroupKey.from(task),
+ assignableTaskMap.keySet(),
+ reservations.asMap());
+
+ attemptsFired.addAndGet(assignableTaskMap.size());
+ Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
+
+ failedToLaunch.forEach(taskId -> {
+ // Task could not be scheduled.
+ // TODO(maxim): Now that preemption slots are searched asynchronously, consider
+ // retrying a launch attempt within the current scheduling round IFF a reservation is
+ // available.
+ maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+ });
+ attemptsNoMatch.addAndGet(failedToLaunch.size());
+
+ // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
+ return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
}
private void maybePreemptFor(
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 7f7b435..4c61762 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -13,6 +13,7 @@
*/
package org.apache.aurora.scheduler.state;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -21,6 +22,8 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.stats.Stats;
@@ -60,15 +63,15 @@ public interface TaskAssigner {
* @param storeProvider Storage provider.
* @param resourceRequest The request for resources being scheduled.
* @param groupKey Task group key.
- * @param taskId Task id to assign.
+ * @param taskIds Task IDs to assign.
* @param slaveReservations Slave reservations.
- * @return Assignment result.
+ * @return Successfully assigned task IDs.
*/
- boolean maybeAssign(
+ Set<String> maybeAssign(
MutableStoreProvider storeProvider,
ResourceRequest resourceRequest,
TaskGroupKey groupKey,
- String taskId,
+ Iterable<String> taskIds,
Map<String, TaskGroupKey> slaveReservations);
class TaskAssignerImpl implements TaskAssigner {
@@ -132,13 +135,22 @@ public interface TaskAssigner {
@Timed("assigner_maybe_assign")
@Override
- public boolean maybeAssign(
+ public Set<String> maybeAssign(
MutableStoreProvider storeProvider,
ResourceRequest resourceRequest,
TaskGroupKey groupKey,
- String taskId,
+ Iterable<String> taskIds,
Map<String, TaskGroupKey> slaveReservations) {
+ if (Iterables.isEmpty(taskIds)) {
+ return ImmutableSet.of();
+ }
+
+ TierInfo tierInfo = tierManager.getTier(groupKey.getTask());
+ ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+ Iterator<String> remainingTasks = taskIds.iterator();
+ String taskId = remainingTasks.next();
+
for (HostOffer offer : offerManager.getOffers(groupKey)) {
Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
slaveReservations.get(offer.getOffer().getSlaveId().getValue()));
@@ -148,7 +160,6 @@ public interface TaskAssigner {
continue;
}
- TierInfo tierInfo = tierManager.getTier(groupKey.getTask());
Set<Veto> vetoes = filter.filter(
new UnusedResource(offer.getResourceBag(tierInfo), offer.getAttributes()),
resourceRequest);
@@ -159,9 +170,17 @@ public interface TaskAssigner {
offer.getOffer(),
taskId);
+ resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
+
try {
offerManager.launchTask(offer.getOffer().getId(), taskInfo);
- return true;
+ assignmentResult.add(taskId);
+
+ if (remainingTasks.hasNext()) {
+ taskId = remainingTasks.next();
+ } else {
+ break;
+ }
} catch (OfferManager.LaunchException e) {
LOG.warn("Failed to launch task.", e);
launchFailures.incrementAndGet();
@@ -177,19 +196,19 @@ public interface TaskAssigner {
Optional.of(PENDING),
LOST,
LAUNCH_FAILED_MSG);
- return false;
+ break;
}
} else {
if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
// Never attempt to match this offer/groupKey pair again.
offerManager.banOffer(offer.getOffer().getId(), groupKey);
}
-
LOG.debug("Agent " + offer.getOffer().getHostname()
+ " vetoed task " + taskId + ": " + vetoes);
}
}
- return false;
+
+ return assignmentResult.build();
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index ece476b..b759427 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -49,7 +49,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
ResourceManager.bagFromResources(TASK.getResources()),
IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
private static final ResourceRequest REQUEST =
- new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.EMPTY);
+ new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.empty());
private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
index 209f929..7496a70 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
@@ -133,6 +133,38 @@ public class AttributeAggregateTest extends EasyMockTest {
assertAggregate(aggregate, "hostc", "2", 0L);
}
+ @Test
+ public void testUpdateAttributeAggregate() {
+ expectGetAttributes(
+ "a1",
+ attribute("host", "a1"),
+ attribute("rack", "a"),
+ attribute("pdu", "p1"));
+
+ control.replay();
+
+ Multiset<Pair<String, String>> expected = ImmutableMultiset.<Pair<String, String>>builder()
+ .add(Pair.of("rack", "a"))
+ .add(Pair.of("host", "a1"))
+ .add(Pair.of("pdu", "p1"))
+ .build();
+
+ AttributeAggregate aggregate = aggregate(task("1", "a1"));
+ assertEquals(expected, aggregate.getAggregates());
+
+ aggregate.updateAttributeAggregate(IHostAttributes.build(new HostAttributes()
+ .setHost("a2")
+ .setAttributes(ImmutableSet.of(attribute("host", "a2"), attribute("rack", "b")))));
+
+ expected = ImmutableMultiset.<Pair<String, String>>builder()
+ .addAll(expected)
+ .add(Pair.of("rack", "b"))
+ .add(Pair.of("host", "a2"))
+ .build();
+
+ assertEquals(expected, aggregate.getAggregates());
+ }
+
private AttributeAggregate aggregate(IScheduledTask... activeTasks) {
return AttributeAggregate.create(
Suppliers.ofInstance(ImmutableSet.copyOf(activeTasks)),
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 0cf23df..1d7f9f4 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -53,7 +53,7 @@ import static org.apache.aurora.gen.Resource.diskMb;
import static org.apache.aurora.gen.Resource.numCpus;
import static org.apache.aurora.gen.Resource.ramMb;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
@@ -128,22 +128,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
none,
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(noPortTask, bag(noPortTask), EMPTY)));
+ new ResourceRequest(noPortTask, bag(noPortTask), empty())));
assertEquals(
none,
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(onePortTask, bag(onePortTask), EMPTY)));
+ new ResourceRequest(onePortTask, bag(onePortTask), empty())));
assertEquals(
none,
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(twoPortTask, bag(twoPortTask), EMPTY)));
+ new ResourceRequest(twoPortTask, bag(twoPortTask), empty())));
assertEquals(
ImmutableSet.of(veto(PORTS, 1)),
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(threePortTask, bag(threePortTask), EMPTY)));
+ new ResourceRequest(threePortTask, bag(threePortTask), empty())));
}
@Test
@@ -409,7 +409,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
ImmutableSet.of(),
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(task, bag(task), EMPTY)));
+ new ResourceRequest(task, bag(task), empty())));
Constraint jvmNegated = jvmConstraint.deepCopy();
jvmNegated.getConstraint().getValue().setNegated(true);
@@ -499,7 +499,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
return checkConstraint(
job,
- EMPTY,
+ empty(),
hostAttributes,
constraintName,
expected,
@@ -537,7 +537,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
}
private void assertNoVetoes(ITaskConfig task, IHostAttributes hostAttributes) {
- assertVetoes(task, hostAttributes, EMPTY);
+ assertVetoes(task, hostAttributes, empty());
}
private void assertNoVetoes(
@@ -549,7 +549,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
}
private void assertVetoes(ITaskConfig task, IHostAttributes hostAttributes, Veto... vetoes) {
- assertVetoes(task, hostAttributes, EMPTY, vetoes);
+ assertVetoes(task, hostAttributes, empty(), vetoes);
}
private void assertVetoes(
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index c1c3eca..fb03f25 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -120,7 +120,8 @@ public abstract class AbstractJettyTest extends EasyMockTest {
new TaskGroupsSettings(
Amount.of(1L, Time.MILLISECONDS),
bindMock(BackoffStrategy.class),
- RateLimiter.create(1000)));
+ RateLimiter.create(1000),
+ 5));
bind(ServiceGroupMonitor.class).toInstance(serviceGroupMonitor);
bindMock(CronJobManager.class);
bindMock(LockManager.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index ee5c652..64da234 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -65,7 +65,7 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
import static org.apache.aurora.scheduler.base.TaskTestUtil.PREFERRED_TIER;
import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl.ORDER;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag;
@@ -129,7 +129,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
return filter.filterPreemptionVictims(
ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
preemptionVictims(victims),
- EMPTY,
+ empty(),
offer,
storageUtil.mutableStoreProvider);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
index 98048fa..40c42b1 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
@@ -45,7 +45,7 @@ import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationStatName;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.successStatName;
import static org.easymock.EasyMock.anyObject;
@@ -133,7 +133,7 @@ public class PreemptorImplTest extends EasyMockTest {
}
private Optional<String> callPreemptor() {
- return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider);
+ return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), empty(), storeProvider);
}
private void expectSlotValidation(
@@ -143,7 +143,7 @@ public class PreemptorImplTest extends EasyMockTest {
expect(preemptionVictimFilter.filterPreemptionVictims(
TASK.getAssignedTask().getTask(),
slot.getVictims(),
- EMPTY,
+ empty(),
Optional.of(OFFER),
storeProvider)).andReturn(victims);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
index 2c3e5f3..67b6d69 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
@@ -78,7 +78,7 @@ public class PreemptorModuleTest extends EasyMockTest {
Optional.absent(),
injector.getInstance(Preemptor.class).attemptPreemptionFor(
IAssignedTask.build(new AssignedTask()),
- AttributeAggregate.EMPTY,
+ AttributeAggregate.empty(),
storageUtil.mutableStoreProvider));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
index 8872962..566e0d9 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -13,6 +13,8 @@
*/
package org.apache.aurora.scheduler.scheduling;
+import java.util.Set;
+
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
@@ -27,8 +29,8 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupBatchWorker;
import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -50,6 +52,7 @@ public class TaskGroupsTest extends EasyMockTest {
private static final Amount<Long, Time> RESCHEDULE_DELAY = FIRST_SCHEDULE_DELAY;
private static final IJobKey JOB_A = IJobKey.build(new JobKey("role", "test", "jobA"));
private static final String TASK_A_ID = "a";
+ private static final Set<String> SCHEDULED_RESULT = ImmutableSet.of(TASK_A_ID);
private BackoffStrategy backoffStrategy;
private TaskScheduler taskScheduler;
@@ -73,7 +76,7 @@ public class TaskGroupsTest extends EasyMockTest {
batchWorker = createMock(TaskGroupBatchWorker.class);
taskGroups = new TaskGroups(
executor,
- new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter),
+ new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter, 2),
taskScheduler,
rescheduleCalculator,
batchWorker);
@@ -82,8 +85,10 @@ public class TaskGroupsTest extends EasyMockTest {
@Test
public void testEvaluatedAfterFirstSchedulePenalty() throws Exception {
expect(rateLimiter.acquire()).andReturn(0D);
- expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
- expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
+ expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID))))
+ .andReturn(SCHEDULED_RESULT);
+ expectBatchExecute(batchWorker, storageUtil.storage, control, SCHEDULED_RESULT)
+ .anyTimes();
control.replay();
@@ -95,15 +100,17 @@ public class TaskGroupsTest extends EasyMockTest {
public void testTaskDeletedBeforeEvaluating() throws Exception {
final IScheduledTask task = makeTask(TASK_A_ID);
expect(rateLimiter.acquire()).andReturn(0D);
- expect(taskScheduler.schedule(anyObject(), eq(Tasks.id(task)))).andAnswer(() -> {
- // Test a corner case where a task is deleted while it is being evaluated by the task
- // scheduler. If not handled carefully, this could result in the scheduler trying again
- // later to satisfy the deleted task.
- taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
-
- return false;
- });
- expectBatchExecute(batchWorker, storageUtil.storage, control, false).anyTimes();
+ expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID))))
+ .andAnswer(() -> {
+ // Test a corner case where a task is deleted while it is being evaluated by the task
+ // scheduler. If not handled carefully, this could result in the scheduler trying again
+ // later to satisfy the deleted task.
+ taskGroups.tasksDeleted(new PubsubEvent.TasksDeleted(ImmutableSet.of(task)));
+
+ return ImmutableSet.of();
+ });
+ expectBatchExecute(batchWorker, storageUtil.storage, control, ImmutableSet.of())
+ .anyTimes();
expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY.as(Time.MILLISECONDS)))
.andReturn(0L);
@@ -117,8 +124,10 @@ public class TaskGroupsTest extends EasyMockTest {
public void testEvaluatedOnStartup() throws Exception {
expect(rateLimiter.acquire()).andReturn(0D);
expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L);
- expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
- expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
+ expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID))))
+ .andReturn(ImmutableSet.of(TASK_A_ID));
+ expectBatchExecute(batchWorker, storageUtil.storage, control, SCHEDULED_RESULT)
+ .anyTimes();
control.replay();
@@ -128,11 +137,19 @@ public class TaskGroupsTest extends EasyMockTest {
}
@Test
- public void testResistStarvation() throws Exception {
+ public void testMultipleTasksAndResistStarvation() throws Exception {
expect(rateLimiter.acquire()).andReturn(0D).times(2);
- expect(taskScheduler.schedule(anyObject(), eq("a0"))).andReturn(true);
- expect(taskScheduler.schedule(anyObject(), eq("b0"))).andReturn(true);
- expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
+ expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("a0", "a1"))))
+ .andReturn(ImmutableSet.of("a0", "a1"));
+ expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("b0"))))
+ .andReturn(ImmutableSet.of("b0"));
+ expectBatchExecute(
+ batchWorker,
+ storageUtil.storage,
+ control,
+ ImmutableSet.of("a0", "a1")).anyTimes();
+ expectBatchExecute(batchWorker, storageUtil.storage, control, ImmutableSet.of("b0"))
+ .anyTimes();
control.replay();
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index a4e87d2..fa1a8178 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -14,6 +14,7 @@
package org.apache.aurora.scheduler.scheduling;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import com.google.common.base.Function;
@@ -63,21 +64,23 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
public class TaskSchedulerImplTest extends EasyMockTest {
-
+ private static final String TASK_ID = "a";
private static final IScheduledTask TASK_A =
- TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a"));
+ TaskTestUtil.makeTask(TASK_ID, JobKeys.from("a", "a", "a"));
private static final TaskGroupKey GROUP_KEY =
TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
private static final String SLAVE_ID = "HOST_A";
private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
+ private static final ImmutableSet<String> SINGLE_TASK = ImmutableSet.of(TASK_ID);
+ private static final Set<String> SCHEDULED_RESULT = ImmutableSet.of(TASK_ID);
+ private static final Set<String> NOT_SCHEDULED_RESULT = ImmutableSet.of();
private StorageTestUtil storageUtil;
private TaskAssigner assigner;
@@ -134,15 +137,15 @@ public class TaskSchedulerImplTest extends EasyMockTest {
.getName()).get());
}
- private IExpectationSetters<Boolean> expectAssigned(
+ private IExpectationSetters<Set<String>> expectAssigned(
IScheduledTask task,
Map<String, TaskGroupKey> reservationMap) {
return expect(assigner.maybeAssign(
storageUtil.mutableStoreProvider,
- new ResourceRequest(task.getAssignedTask().getTask(), bag(task), EMPTY),
+ new ResourceRequest(task.getAssignedTask().getTask(), bag(task), empty()),
TaskGroupKey.from(task.getAssignedTask().getTask()),
- Tasks.id(task),
+ ImmutableSet.of(Tasks.id(task)),
reservationMap));
}
@@ -153,11 +156,13 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectAsMap(NO_RESERVATION);
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- expectAssigned(TASK_A, NO_RESERVATION).andReturn(true);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(SCHEDULED_RESULT);
control.replay();
- assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+ assertEquals(
+ SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
}
@Test
@@ -169,7 +174,45 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+ assertEquals(
+ SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
+ }
+
+ @Test
+ public void testSchedulePartial() throws Exception {
+ storageUtil.expectOperations();
+
+ String taskB = "b";
+ expectAsMap(NO_RESERVATION);
+ storageUtil.expectTaskFetch(
+ Query.taskScoped(Tasks.id(TASK_A), taskB).byStatus(PENDING),
+ ImmutableSet.of(TASK_A));
+ expectActiveJobFetch(TASK_A);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(SCHEDULED_RESULT);
+
+ control.replay();
+
+ // Task b should be returned as well to be purged from its TaskGroup.
+ assertEquals(
+ ImmutableSet.of(TASK_ID, taskB),
+ scheduler.schedule(storageUtil.mutableStoreProvider, ImmutableSet.of(TASK_ID, taskB)));
+ }
+
+ @Test
+ public void testMultipleGroupsRejected() {
+ storageUtil.expectOperations();
+
+ String taskB = "b";
+ storageUtil.expectTaskFetch(
+ Query.taskScoped(Tasks.id(TASK_A), taskB).byStatus(PENDING),
+ ImmutableSet.of(TASK_A, TaskTestUtil.makeTask(taskB, JobKeys.from("b", "b", "b"))));
+
+ control.replay();
+
+ assertEquals(
+ NOT_SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, ImmutableSet.of(TASK_ID, taskB)));
}
@Test
@@ -179,15 +222,15 @@ public class TaskSchedulerImplTest extends EasyMockTest {
// No reservation available in preemptor
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT);
expectAsMap(NO_RESERVATION);
expectNoReservation(TASK_A);
- expectPreemptorCall(TASK_A, Optional.<String>absent());
+ expectPreemptorCall(TASK_A, Optional.absent());
// Slave is reserved.
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT);
expectAsMap(NO_RESERVATION);
expectNoReservation(TASK_A);
expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
@@ -197,13 +240,19 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
expectAsMap(ImmutableMap.of(SLAVE_ID, GROUP_KEY));
- expectAssigned(TASK_A, ImmutableMap.of(SLAVE_ID, GROUP_KEY)).andReturn(true);
+ expectAssigned(TASK_A, ImmutableMap.of(SLAVE_ID, GROUP_KEY)).andReturn(SCHEDULED_RESULT);
control.replay();
- assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
- assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
- assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+ assertEquals(
+ NOT_SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
+ assertEquals(
+ NOT_SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
+ assertEquals(
+ SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
}
@Test
@@ -213,12 +262,14 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
expectAsMap(NO_RESERVATION);
- expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT);
expectGetReservation(TASK_A, SLAVE_ID);
control.replay();
- assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+ assertEquals(
+ NOT_SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
}
@Test
@@ -228,12 +279,14 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
expectAsMap(NO_RESERVATION);
- expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT);
expectGetReservation(TASK_A, SLAVE_ID);
control.replay();
- assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+ assertEquals(
+ NOT_SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
}
@Test
@@ -274,15 +327,15 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectAsMap(NO_RESERVATION);
expect(assigner.maybeAssign(
EasyMock.anyObject(),
- eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), EMPTY)),
+ eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), empty())),
eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())),
- eq(Tasks.id(taskA)),
- eq(NO_RESERVATION))).andReturn(true);
+ eq(SINGLE_TASK),
+ eq(NO_RESERVATION))).andReturn(SCHEDULED_RESULT);
control.replay();
memStorage.write((NoResult.Quiet)
- store -> assertTrue(scheduler.schedule(store, Tasks.id(taskA))));
+ store -> assertEquals(SCHEDULED_RESULT, scheduler.schedule(store, SINGLE_TASK)));
}
@Test
@@ -296,13 +349,15 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+ assertEquals(
+ NOT_SCHEDULED_RESULT,
+ scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
}
private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
expect(preemptor.attemptPreemptionFor(
task.getAssignedTask(),
- EMPTY,
+ empty(),
storageUtil.mutableStoreProvider)).andReturn(result);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index b4d27f6..b482be5 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -14,6 +14,7 @@
package org.apache.aurora.scheduler.state;
import java.util.Map;
+import java.util.Set;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
@@ -21,6 +22,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.TaskConfig;
@@ -28,6 +30,7 @@ import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
@@ -57,7 +60,7 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
@@ -70,8 +73,7 @@ import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
public class TaskAssignerImplTest extends EasyMockTest {
@@ -79,7 +81,10 @@ public class TaskAssignerImplTest extends EasyMockTest {
private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
private static final String SLAVE_ID = MESOS_OFFER.getSlaveId().getValue();
private static final HostOffer OFFER =
- new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
+ new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
+ .setHost(MESOS_OFFER.getHostname())
+ .setAttributes(ImmutableSet.of(
+ new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
private static final IScheduledTask TASK = makeTask("id", JOB);
private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
@@ -91,10 +96,23 @@ public class TaskAssignerImplTest extends EasyMockTest {
private static final UnusedResource UNUSED = new UnusedResource(
bagFromMesosResources(MESOS_OFFER.getResourcesList()),
OFFER.getAttributes());
- private static final ResourceRequest RESOURCE_REQUEST = new ResourceRequest(
- TASK.getAssignedTask().getTask(),
- ResourceBag.EMPTY,
- EMPTY);
+ private static final HostOffer OFFER_2 = new HostOffer(
+ Offer.newBuilder()
+ .setId(OfferID.newBuilder().setValue("offerId0"))
+ .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
+ .setSlaveId(SlaveID.newBuilder().setValue("slaveId0"))
+ .setHostname("hostName0")
+ .addResources(Resource.newBuilder()
+ .setName("ports")
+ .setType(Type.RANGES)
+ .setRanges(
+ Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
+ .build(),
+ IHostAttributes.build(new HostAttributes()));
+
+ private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
+
+ private ResourceRequest resourceRequest;
private MutableStoreProvider storeProvider;
private StateManager stateManager;
@@ -113,26 +131,43 @@ public class TaskAssignerImplTest extends EasyMockTest {
offerManager = createMock(OfferManager.class);
tierManager = createMock(TierManager.class);
assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager, tierManager);
+ resourceRequest = new ResourceRequest(
+ TASK.getAssignedTask().getTask(),
+ ResourceBag.EMPTY,
+ empty());
}
@Test
- public void testAssignNoVetoes() throws Exception {
+ public void testAssignNoTasks() throws Exception {
+ control.replay();
+
+ assertEquals(
+ NO_ASSIGNMENT,
+ assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
+ }
+
+ @Test
+ public void testAssignPartialNoVetoes() throws Exception {
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
+ expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
expectAssignTask(MESOS_OFFER);
expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
.andReturn(TASK_INFO);
control.replay();
- assertTrue(assigner.maybeAssign(
- storeProvider,
- new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, EMPTY),
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- Tasks.id(TASK),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+ AttributeAggregate aggregate = empty();
+ assertEquals(
+ ImmutableSet.of(Tasks.id(TASK)),
+ assigner.maybeAssign(
+ storeProvider,
+ new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(Tasks.id(TASK), "id2", "id3"),
+ ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+ assertNotEquals(empty(), aggregate);
}
@Test
@@ -140,43 +175,47 @@ public class TaskAssignerImplTest extends EasyMockTest {
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, RESOURCE_REQUEST))
+ expect(filter.filter(UNUSED, resourceRequest))
.andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
control.replay();
- assertFalse(assigner.maybeAssign(
- storeProvider,
- RESOURCE_REQUEST,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- Tasks.id(TASK),
- NO_RESERVATION));
+ assertEquals(
+ NO_ASSIGNMENT,
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(Tasks.id(TASK)),
+ NO_RESERVATION));
}
@Test
public void testAssignVetoesWithNoStaticBan() throws Exception {
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, RESOURCE_REQUEST))
+ expect(filter.filter(UNUSED, resourceRequest))
.andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
control.replay();
- assertFalse(assigner.maybeAssign(
- storeProvider,
- RESOURCE_REQUEST,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- Tasks.id(TASK),
- NO_RESERVATION));
+ assertEquals(
+ NO_ASSIGNMENT,
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(Tasks.id(TASK)),
+ NO_RESERVATION));
}
@Test
public void testAssignmentClearedOnError() throws Exception {
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+ expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2));
offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
+ expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
expectAssignTask(MESOS_OFFER);
expect(stateManager.changeState(
storeProvider,
@@ -190,27 +229,33 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertFalse(assigner.maybeAssign(
- storeProvider,
- RESOURCE_REQUEST,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- Tasks.id(TASK),
- NO_RESERVATION));
+ // Ensures scheduling loop terminates on the first launch failure.
+ assertEquals(
+ NO_ASSIGNMENT,
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(Tasks.id(TASK), "id2", "id3"),
+ NO_RESERVATION));
}
@Test
public void testAssignmentSkippedForReservedSlave() throws Exception {
+ expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
control.replay();
- assertFalse(assigner.maybeAssign(
- storeProvider,
- RESOURCE_REQUEST,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- Tasks.id(TASK),
- ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
- ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
+ assertEquals(
+ NO_ASSIGNMENT,
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(Tasks.id(TASK)),
+ ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
+ ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
}
@Test
@@ -218,36 +263,28 @@ public class TaskAssignerImplTest extends EasyMockTest {
// Ensures slave/task reservation relationship is only enforced in slave->task direction
// and permissive in task->slave direction. In other words, a task with a slave reservation
// should still be tried against other unreserved slaves.
- HostOffer offer = new HostOffer(
- Offer.newBuilder()
- .setId(OfferID.newBuilder().setValue("offerId0"))
- .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
- .setSlaveId(SlaveID.newBuilder().setValue("slaveId0"))
- .setHostname("hostName0")
- .addResources(Resource.newBuilder()
- .setName("ports")
- .setType(Type.RANGES)
- .setRanges(
- Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
- .build(),
- IHostAttributes.build(new HostAttributes()));
-
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER));
+ expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER));
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
- expectAssignTask(offer.getOffer());
- expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer()))
+ expect(filter.filter(
+ new UnusedResource(
+ bagFromMesosResources(OFFER_2.getOffer().getResourcesList()),
+ OFFER_2.getAttributes()),
+ resourceRequest)).andReturn(ImmutableSet.of());
+ expectAssignTask(OFFER_2.getOffer());
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer()))
.andReturn(TASK_INFO);
- offerManager.launchTask(offer.getOffer().getId(), TASK_INFO);
+ offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
control.replay();
- assertTrue(assigner.maybeAssign(
- storeProvider,
- RESOURCE_REQUEST,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- Tasks.id(TASK),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+ assertEquals(
+ ImmutableSet.of(Tasks.id(TASK)),
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(Tasks.id(TASK)),
+ ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
}
@Test
@@ -268,18 +305,18 @@ public class TaskAssignerImplTest extends EasyMockTest {
IHostAttributes.build(new HostAttributes()));
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER).times(2);
+ expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
expect(filter.filter(
new UnusedResource(
bagFromMesosResources(mismatched.getOffer().getResourcesList()),
mismatched.getAttributes()),
- RESOURCE_REQUEST))
+ resourceRequest))
.andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch")));
offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY);
expect(filter.filter(
new UnusedResource(
bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()),
- RESOURCE_REQUEST))
+ resourceRequest))
.andReturn(ImmutableSet.of());
expectAssignTask(MESOS_OFFER);
@@ -289,12 +326,14 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertTrue(assigner.maybeAssign(
- storeProvider,
- RESOURCE_REQUEST,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- Tasks.id(TASK),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+ assertEquals(
+ ImmutableSet.of(Tasks.id(TASK)),
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(Tasks.id(TASK)),
+ ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
}
@Test