You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/09/27 19:27:18 UTC

aurora git commit: Scheduling multiple tasks per round.

Repository: aurora
Updated Branches:
  refs/heads/master 60e5e4e67 -> f559e9306


Scheduling multiple tasks per round.

Reviewed at https://reviews.apache.org/r/51929/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f559e930
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f559e930
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f559e930

Branch: refs/heads/master
Commit: f559e930659e25b3d7cacb7b845ebda50d18d66a
Parents: 60e5e4e
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Tue Sep 27 12:27:04 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Sep 27 12:27:04 2016 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  |  19 +-
 .../scheduler/filter/AttributeAggregate.java    |  41 ++--
 .../scheduler/scheduling/SchedulingModule.java  |   8 +-
 .../aurora/scheduler/scheduling/TaskGroup.java  |  12 +-
 .../aurora/scheduler/scheduling/TaskGroups.java |  58 +++---
 .../scheduler/scheduling/TaskScheduler.java     | 127 +++++++-----
 .../aurora/scheduler/state/TaskAssigner.java    |  41 ++--
 .../events/NotifyingSchedulingFilterTest.java   |   2 +-
 .../filter/AttributeAggregateTest.java          |  32 +++
 .../filter/SchedulingFilterImplTest.java        |  18 +-
 .../scheduler/http/AbstractJettyTest.java       |   3 +-
 .../preemptor/PreemptionVictimFilterTest.java   |   4 +-
 .../scheduler/preemptor/PreemptorImplTest.java  |   6 +-
 .../preemptor/PreemptorModuleTest.java          |   2 +-
 .../scheduler/scheduling/TaskGroupsTest.java    |  55 ++++--
 .../scheduling/TaskSchedulerImplTest.java       | 111 ++++++++---
 .../scheduler/state/TaskAssignerImplTest.java   | 197 +++++++++++--------
 17 files changed, 475 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 6f1cbfb..1b56500 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Singleton;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.eventbus.EventBus;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -192,7 +193,9 @@ public class SchedulingBenchmarks {
       saveTasks(tasksToAssign);
       storage.write((NoResult.Quiet) store -> {
         for (IScheduledTask scheduledTask : tasksToAssign) {
-          taskScheduler.schedule(store, scheduledTask.getAssignedTask().getTaskId());
+          taskScheduler.schedule(
+              store,
+              ImmutableSet.of(scheduledTask.getAssignedTask().getTaskId()));
         }
       });
     }
@@ -220,11 +223,13 @@ public class SchedulingBenchmarks {
      * See {@see http://openjdk.java.net/projects/code-tools/jmh/} for more info.
      */
     @Benchmark
-    public boolean runBenchmark() {
-      return storage.write((Storage.MutateWork.Quiet<Boolean>) store -> {
-        boolean result = false;
+    public Set<String> runBenchmark() {
+      return storage.write((Storage.MutateWork.Quiet<Set<String>>) store -> {
+        Set<String> result = null;
         for (IScheduledTask task : settings.getTasks()) {
-          result = taskScheduler.schedule(store, task.getAssignedTask().getTaskId());
+          result = taskScheduler.schedule(
+              store,
+              ImmutableSet.of(task.getAssignedTask().getTaskId()));
         }
         return result;
       });
@@ -313,10 +318,10 @@ public class SchedulingBenchmarks {
     }
 
     @Override
-    public boolean runBenchmark() {
+    public Set<String> runBenchmark() {
       pendingTaskProcessor.run();
       // Return non-guessable result to satisfy "blackhole" requirement.
-      return System.currentTimeMillis() % 5 == 0;
+      return ImmutableSet.of("" + System.currentTimeMillis());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index 87b9e19..f04149e 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -27,6 +27,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAttribute;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -46,7 +47,7 @@ public final class AttributeAggregate {
    * A mapping from attribute name and value to the count of tasks with that name/value combination.
    * See doc for {@link #getNumTasksWithAttribute(String, String)} for further details.
    */
-  private final Supplier<Multiset<Pair<String, String>>> aggregate;
+  private Supplier<Multiset<Pair<String, String>>> aggregate;
 
   private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) {
     this.aggregate = Suppliers.memoize(aggregate);
@@ -92,25 +93,35 @@ public final class AttributeAggregate {
   @VisibleForTesting
   static AttributeAggregate create(Supplier<Iterable<IAttribute>> attributes) {
     Supplier<Multiset<Pair<String, String>>> aggregator = Suppliers.compose(
-        attributes1 -> {
-          ImmutableMultiset.Builder<Pair<String, String>> builder = ImmutableMultiset.builder();
-          for (IAttribute attribute : attributes1) {
-            for (String value : attribute.getValues()) {
-              builder.add(Pair.of(attribute.getName(), value));
-            }
-          }
-
-          return builder.build();
-        },
-        attributes
-    );
+        attributes1 -> addAttributes(ImmutableMultiset.builder(), attributes1).build(),
+        attributes);
 
     return new AttributeAggregate(aggregator);
   }
 
+  static ImmutableMultiset.Builder<Pair<String, String>> addAttributes(
+      ImmutableMultiset.Builder<Pair<String, String>> builder,
+      Iterable<IAttribute> attributes) {
+
+    for (IAttribute attribute : attributes) {
+      for (String value : attribute.getValues()) {
+        builder.add(Pair.of(attribute.getName(), value));
+      }
+    }
+    return builder;
+  }
+
+  public void updateAttributeAggregate(IHostAttributes attributes) {
+    ImmutableMultiset.Builder<Pair<String, String>> builder = new ImmutableMultiset.Builder<>();
+    builder.addAll(aggregate.get());
+    addAttributes(builder, attributes.getAttributes());
+    aggregate = Suppliers.memoize(() -> builder.build());
+  }
+
   @VisibleForTesting
-  public static final AttributeAggregate EMPTY =
-      new AttributeAggregate(Suppliers.ofInstance(ImmutableMultiset.of()));
+  public static AttributeAggregate empty() {
+    return new AttributeAggregate(Suppliers.ofInstance(ImmutableMultiset.of()));
+  }
 
   /**
    * Gets the total number of tasks with a given attribute name and value combination.

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index 664bc6c..03a0e84 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -90,6 +90,11 @@ public class SchedulingModule extends AbstractModule {
       help = "The maximum number of scheduling attempts that can be processed in a batch.")
   private static final Arg<Integer> SCHEDULING_MAX_BATCH_SIZE = Arg.create(3);
 
+  @Positive
+  @CmdLine(name = "max_tasks_per_schedule_attempt",
+      help = "The maximum number of tasks to pick in a single scheduling attempt.")
+  private static final Arg<Integer> MAX_TASKS_PER_SCHEDULE_ATTEMPT = Arg.create(5);
+
   @Override
   protected void configure() {
     install(new PrivateModule() {
@@ -100,7 +105,8 @@ public class SchedulingModule extends AbstractModule {
             new TruncatedBinaryBackoff(
                 INITIAL_SCHEDULE_PENALTY.get(),
                 MAX_SCHEDULE_PENALTY.get()),
-            RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
+            RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get()),
+            MAX_TASKS_PER_SCHEDULE_ATTEMPT.get()));
 
         bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
             .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
index 5d31955..b521620 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
@@ -13,15 +13,17 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
+import java.util.Collection;
 import java.util.Queue;
 import java.util.Set;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 
+import static org.apache.aurora.GuavaUtils.toImmutableSet;
+
 /**
  * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
  */
@@ -41,16 +43,16 @@ class TaskGroup {
     return key;
   }
 
-  synchronized Optional<String> peek() {
-    return Optional.fromNullable(tasks.peek());
+  synchronized Set<String> peek(int maxTasks) {
+    return tasks.stream().limit(Math.min(tasks.size(), maxTasks)).collect(toImmutableSet());
   }
 
   synchronized boolean hasMore() {
     return !tasks.isEmpty();
   }
 
-  synchronized void remove(String taskId) {
-    tasks.remove(taskId);
+  synchronized void remove(Collection<String> taskIdsToRemove) {
+    tasks.removeAll(taskIdsToRemove);
   }
 
   synchronized void offer(String taskId) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index d390c07..77187bc 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.scheduling;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -23,7 +24,6 @@ import javax.inject.Inject;
 import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableSet;
@@ -71,11 +71,10 @@ public class TaskGroups implements EventSubscriber {
 
   private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap();
   private final DelayExecutor executor;
+  private final TaskGroupsSettings settings;
   private final TaskScheduler taskScheduler;
-  private final long firstScheduleDelay;
-  private final BackoffStrategy backoff;
   private final RescheduleCalculator rescheduleCalculator;
-  private final BatchWorker<Boolean> batchWorker;
+  private final BatchWorker<Set<String>> batchWorker;
 
   // Track the penalties of tasks at the time they were scheduled. This is to provide data that
   // may influence the selection of a different backoff strategy.
@@ -91,7 +90,7 @@ public class TaskGroups implements EventSubscriber {
   public @interface SchedulingMaxBatchSize { }
 
   @VisibleForTesting
-  public static class TaskGroupBatchWorker extends BatchWorker<Boolean> {
+  public static class TaskGroupBatchWorker extends BatchWorker<Set<String>> {
     @Inject
     TaskGroupBatchWorker(
         Storage storage,
@@ -111,15 +110,20 @@ public class TaskGroups implements EventSubscriber {
     private final Amount<Long, Time> firstScheduleDelay;
     private final BackoffStrategy taskGroupBackoff;
     private final RateLimiter rateLimiter;
+    private final int maxTasksPerSchedule;
 
     public TaskGroupsSettings(
         Amount<Long, Time> firstScheduleDelay,
         BackoffStrategy taskGroupBackoff,
-        RateLimiter rateLimiter) {
+        RateLimiter rateLimiter,
+        int maxTasksPerSchedule) {
 
       this.firstScheduleDelay = requireNonNull(firstScheduleDelay);
+      Preconditions.checkArgument(firstScheduleDelay.getValue() > 0);
       this.taskGroupBackoff = requireNonNull(taskGroupBackoff);
       this.rateLimiter = requireNonNull(rateLimiter);
+      this.maxTasksPerSchedule = maxTasksPerSchedule;
+      Preconditions.checkArgument(maxTasksPerSchedule > 0);
     }
   }
 
@@ -131,21 +135,11 @@ public class TaskGroups implements EventSubscriber {
       RescheduleCalculator rescheduleCalculator,
       TaskGroupBatchWorker batchWorker) {
 
-    requireNonNull(settings.firstScheduleDelay);
-    Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0);
-
     this.executor = requireNonNull(executor);
-    requireNonNull(settings.rateLimiter);
-    requireNonNull(taskScheduler);
-    this.firstScheduleDelay = settings.firstScheduleDelay.as(Time.MILLISECONDS);
-    this.backoff = requireNonNull(settings.taskGroupBackoff);
+    this.settings = requireNonNull(settings);
+    this.taskScheduler = requireNonNull(taskScheduler);
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
     this.batchWorker = requireNonNull(batchWorker);
-
-    this.taskScheduler = (store, taskId) -> {
-      settings.rateLimiter.acquire();
-      return taskScheduler.schedule(store, taskId);
-    };
   }
 
   private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) {
@@ -162,27 +156,29 @@ public class TaskGroups implements EventSubscriber {
     Runnable monitor = new Runnable() {
       @Override
       public void run() {
-        final Optional<String> taskId = group.peek();
+        final Set<String> taskIds = group.peek(settings.maxTasksPerSchedule);
         long penaltyMs = 0;
-        if (taskId.isPresent()) {
-          CompletableFuture<Boolean> result = batchWorker.execute(storeProvider ->
-              taskScheduler.schedule(storeProvider, taskId.get()));
-          boolean isScheduled = false;
+        if (!taskIds.isEmpty()) {
+          settings.rateLimiter.acquire();
+          CompletableFuture<Set<String>> result = batchWorker.execute(storeProvider ->
+              taskScheduler.schedule(storeProvider, taskIds));
+
+          Set<String> scheduled = null;
           try {
-            isScheduled = result.get();
+            scheduled = result.get();
           } catch (ExecutionException | InterruptedException e) {
             Thread.currentThread().interrupt();
             Throwables.propagate(e);
           }
 
-          if (isScheduled) {
+          if (scheduled.isEmpty()) {
+            penaltyMs = settings.taskGroupBackoff.calculateBackoffMs(group.getPenaltyMs());
+          } else {
             scheduledTaskPenalties.accumulate(group.getPenaltyMs());
-            group.remove(taskId.get());
+            group.remove(scheduled);
             if (group.hasMore()) {
-              penaltyMs = firstScheduleDelay;
+              penaltyMs = settings.firstScheduleDelay.as(Time.MILLISECONDS);
             }
-          } else {
-            penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs());
           }
         }
 
@@ -211,7 +207,7 @@ public class TaskGroups implements EventSubscriber {
       if (existing == null) {
         long penaltyMs;
         if (stateChange.isTransition()) {
-          penaltyMs = firstScheduleDelay;
+          penaltyMs = settings.firstScheduleDelay.as(Time.MILLISECONDS);
         } else {
           penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task);
         }
@@ -234,7 +230,7 @@ public class TaskGroups implements EventSubscriber {
         : Iterables.transform(deleted.getTasks(), IScheduledTask::getAssignedTask)) {
       TaskGroup group = groups.get(TaskGroupKey.from(task.getTask()));
       if (group != null) {
-        group.remove(task.getTaskId());
+        group.remove(ImmutableSet.of(task.getTaskId()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 207d38d..31edb1d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -15,14 +15,21 @@ package org.apache.aurora.scheduler.scheduling;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import com.google.common.eventbus.Subscribe;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
@@ -51,6 +58,8 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
+import static java.util.stream.Collectors.toMap;
+
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
 
@@ -63,11 +72,11 @@ public interface TaskScheduler extends EventSubscriber {
    * Attempts to schedule a task, possibly performing irreversible actions.
    *
    * @param storeProvider {@code MutableStoreProvider} instance to access data store.
-   * @param taskId The task to attempt to schedule.
-   * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should
-   *         call schedule again if {@code false} is returned.
+   * @param taskIds The tasks to attempt to schedule.
+   * @return Successfully scheduled task IDs. The caller should call schedule again if a given
+   *         task ID was not present in the result.
    */
-  boolean schedule(MutableStoreProvider storeProvider, String taskId);
+  Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds);
 
   /**
    * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
@@ -109,62 +118,84 @@ public interface TaskScheduler extends EventSubscriber {
     }
 
     @Timed ("task_schedule_attempt")
-    public boolean schedule(MutableStoreProvider store, String taskId) {
-      attemptsFired.incrementAndGet();
+    public Set<String> schedule(MutableStoreProvider store, Iterable<String> taskIds) {
       try {
-        return scheduleTask(store, taskId);
+        return scheduleTasks(store, taskIds);
       } catch (RuntimeException e) {
         // We catch the generic unchecked exception here to ensure tasks are not abandoned
         // if there is a transient issue resulting in an unchecked exception.
         LOG.warn("Task scheduling unexpectedly failed, will be retried", e);
         attemptsFailed.incrementAndGet();
-        return false;
+        // Return empty set for all task IDs to be retried later.
+        // It's ok if some tasks were already assigned, those will be ignored in the next round.
+        return ImmutableSet.of();
       }
     }
 
-    private boolean scheduleTask(MutableStoreProvider store, String taskId) {
-      LOG.debug("Attempting to schedule task " + taskId);
-      IAssignedTask assignedTask = Iterables.getOnlyElement(
-          Iterables.transform(
-              store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
-              IScheduledTask::getAssignedTask),
-          null);
-
-      if (assignedTask == null) {
-        LOG.warn("Failed to look up task " + taskId + ", it may have been deleted.");
-      } else {
-        ITaskConfig task = assignedTask.getTask();
-        AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
-
-        // Valid Docker tasks can have a container but no executor config
-        ResourceBag overhead = ResourceBag.EMPTY;
-        if (task.isSetExecutorConfig()) {
-          overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
-              .orElseThrow(
-                  () -> new IllegalArgumentException("Cannot find executor configuration"));
-        }
+    private Set<String> scheduleTasks(MutableStoreProvider store, Iterable<String> tasks) {
+      ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
+      String taskIdValues = Joiner.on(",").join(taskIds);
+      LOG.debug("Attempting to schedule tasks " + taskIdValues);
+      ImmutableSet<IAssignedTask> assignedTasks =
+          ImmutableSet.copyOf(Iterables.transform(
+              store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
+              IScheduledTask::getAssignedTask));
+
+      if (Iterables.isEmpty(assignedTasks)) {
+        LOG.warn("Failed to look up all tasks in a scheduling round: " + taskIdValues);
+        return taskIds;
+      }
 
-        boolean launched = assigner.maybeAssign(
-            store,
-            new ResourceRequest(
-                task,
-                bagFromResources(task.getResources()).add(overhead), aggregate),
-            TaskGroupKey.from(task),
-            taskId,
-            reservations.asMap());
-
-        if (!launched) {
-          // Task could not be scheduled.
-          // TODO(maxim): Now that preemption slots are searched asynchronously, consider
-          // retrying a launch attempt within the current scheduling round IFF a reservation is
-          // available.
-          maybePreemptFor(assignedTask, aggregate, store);
-          attemptsNoMatch.incrementAndGet();
-          return false;
-        }
+      Preconditions.checkState(
+          assignedTasks.stream()
+              .collect(Collectors.groupingBy(t -> t.getTask()))
+              .entrySet()
+              .size() == 1,
+          "Found multiple task groups for " + taskIdValues);
+
+      Map<String, IAssignedTask> assignableTaskMap =
+          assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+
+      if (taskIds.size() != assignedTasks.size()) {
+        LOG.warn("Failed to look up tasks "
+            + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
+      }
+
+      // This is safe after all checks above.
+      ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+      AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
+
+      // Valid Docker tasks can have a container but no executor config
+      ResourceBag overhead = ResourceBag.EMPTY;
+      if (task.isSetExecutorConfig()) {
+        overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
+            .orElseThrow(
+                () -> new IllegalArgumentException("Cannot find executor configuration"));
       }
 
-      return true;
+      Set<String> launched = assigner.maybeAssign(
+          store,
+          new ResourceRequest(
+              task,
+              bagFromResources(task.getResources()).add(overhead), aggregate),
+          TaskGroupKey.from(task),
+          assignableTaskMap.keySet(),
+          reservations.asMap());
+
+      attemptsFired.addAndGet(assignableTaskMap.size());
+      Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
+
+      failedToLaunch.forEach(taskId -> {
+        // Task could not be scheduled.
+        // TODO(maxim): Now that preemption slots are searched asynchronously, consider
+        // retrying a launch attempt within the current scheduling round IFF a reservation is
+        // available.
+        maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+      });
+      attemptsNoMatch.addAndGet(failedToLaunch.size());
+
+      // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
+      return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
     }
 
     private void maybePreemptFor(

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 7f7b435..4c61762 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.state;
 
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -21,6 +22,8 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.stats.Stats;
@@ -60,15 +63,15 @@ public interface TaskAssigner {
    * @param storeProvider Storage provider.
    * @param resourceRequest The request for resources being scheduled.
    * @param groupKey Task group key.
-   * @param taskId Task id to assign.
+   * @param taskIds Task IDs to assign.
    * @param slaveReservations Slave reservations.
-   * @return Assignment result.
+   * @return Successfully assigned task IDs.
    */
-  boolean maybeAssign(
+  Set<String> maybeAssign(
       MutableStoreProvider storeProvider,
       ResourceRequest resourceRequest,
       TaskGroupKey groupKey,
-      String taskId,
+      Iterable<String> taskIds,
       Map<String, TaskGroupKey> slaveReservations);
 
   class TaskAssignerImpl implements TaskAssigner {
@@ -132,13 +135,22 @@ public interface TaskAssigner {
 
     @Timed("assigner_maybe_assign")
     @Override
-    public boolean maybeAssign(
+    public Set<String> maybeAssign(
         MutableStoreProvider storeProvider,
         ResourceRequest resourceRequest,
         TaskGroupKey groupKey,
-        String taskId,
+        Iterable<String> taskIds,
         Map<String, TaskGroupKey> slaveReservations) {
 
+      if (Iterables.isEmpty(taskIds)) {
+        return ImmutableSet.of();
+      }
+
+      TierInfo tierInfo = tierManager.getTier(groupKey.getTask());
+      ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+      Iterator<String> remainingTasks = taskIds.iterator();
+      String taskId = remainingTasks.next();
+
       for (HostOffer offer : offerManager.getOffers(groupKey)) {
         Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
             slaveReservations.get(offer.getOffer().getSlaveId().getValue()));
@@ -148,7 +160,6 @@ public interface TaskAssigner {
           continue;
         }
 
-        TierInfo tierInfo = tierManager.getTier(groupKey.getTask());
         Set<Veto> vetoes = filter.filter(
             new UnusedResource(offer.getResourceBag(tierInfo), offer.getAttributes()),
             resourceRequest);
@@ -159,9 +170,17 @@ public interface TaskAssigner {
               offer.getOffer(),
               taskId);
 
+          resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
+
           try {
             offerManager.launchTask(offer.getOffer().getId(), taskInfo);
-            return true;
+            assignmentResult.add(taskId);
+
+            if (remainingTasks.hasNext()) {
+              taskId = remainingTasks.next();
+            } else {
+              break;
+            }
           } catch (OfferManager.LaunchException e) {
             LOG.warn("Failed to launch task.", e);
             launchFailures.incrementAndGet();
@@ -177,19 +196,19 @@ public interface TaskAssigner {
                 Optional.of(PENDING),
                 LOST,
                 LAUNCH_FAILED_MSG);
-            return false;
+            break;
           }
         } else {
           if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
             // Never attempt to match this offer/groupKey pair again.
             offerManager.banOffer(offer.getOffer().getId(), groupKey);
           }
-
           LOG.debug("Agent " + offer.getOffer().getHostname()
               + " vetoed task " + taskId + ": " + vetoes);
         }
       }
-      return false;
+
+      return assignmentResult.build();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index ece476b..b759427 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -49,7 +49,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
       ResourceManager.bagFromResources(TASK.getResources()),
       IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
   private static final ResourceRequest REQUEST =
-      new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.EMPTY);
+      new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.empty());
 
   private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
   private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
index 209f929..7496a70 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
@@ -133,6 +133,38 @@ public class AttributeAggregateTest extends EasyMockTest {
     assertAggregate(aggregate, "hostc", "2", 0L);
   }
 
+  @Test
+  public void testUpdateAttributeAggregate() {
+    expectGetAttributes(
+        "a1",
+        attribute("host", "a1"),
+        attribute("rack", "a"),
+        attribute("pdu", "p1"));
+
+    control.replay();
+
+    Multiset<Pair<String, String>> expected = ImmutableMultiset.<Pair<String, String>>builder()
+        .add(Pair.of("rack", "a"))
+        .add(Pair.of("host", "a1"))
+        .add(Pair.of("pdu", "p1"))
+        .build();
+
+    AttributeAggregate aggregate = aggregate(task("1", "a1"));
+    assertEquals(expected, aggregate.getAggregates());
+
+    aggregate.updateAttributeAggregate(IHostAttributes.build(new HostAttributes()
+        .setHost("a2")
+        .setAttributes(ImmutableSet.of(attribute("host", "a2"), attribute("rack", "b")))));
+
+    expected = ImmutableMultiset.<Pair<String, String>>builder()
+        .addAll(expected)
+        .add(Pair.of("rack", "b"))
+        .add(Pair.of("host", "a2"))
+        .build();
+
+    assertEquals(expected, aggregate.getAggregates());
+  }
+
   private AttributeAggregate aggregate(IScheduledTask... activeTasks) {
     return AttributeAggregate.create(
         Suppliers.ofInstance(ImmutableSet.copyOf(activeTasks)),

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 0cf23df..1d7f9f4 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -53,7 +53,7 @@ import static org.apache.aurora.gen.Resource.diskMb;
 import static org.apache.aurora.gen.Resource.numCpus;
 import static org.apache.aurora.gen.Resource.ramMb;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
@@ -128,22 +128,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(noPortTask, bag(noPortTask), EMPTY)));
+            new ResourceRequest(noPortTask, bag(noPortTask), empty())));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(onePortTask, bag(onePortTask), EMPTY)));
+            new ResourceRequest(onePortTask, bag(onePortTask), empty())));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(twoPortTask, bag(twoPortTask), EMPTY)));
+            new ResourceRequest(twoPortTask, bag(twoPortTask), empty())));
     assertEquals(
         ImmutableSet.of(veto(PORTS, 1)),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(threePortTask, bag(threePortTask), EMPTY)));
+            new ResourceRequest(threePortTask, bag(threePortTask), empty())));
   }
 
   @Test
@@ -409,7 +409,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         ImmutableSet.of(),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(task, bag(task), EMPTY)));
+            new ResourceRequest(task, bag(task), empty())));
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -499,7 +499,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     return checkConstraint(
         job,
-        EMPTY,
+        empty(),
         hostAttributes,
         constraintName,
         expected,
@@ -537,7 +537,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   private void assertNoVetoes(ITaskConfig task, IHostAttributes hostAttributes) {
-    assertVetoes(task, hostAttributes, EMPTY);
+    assertVetoes(task, hostAttributes, empty());
   }
 
   private void assertNoVetoes(
@@ -549,7 +549,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   private void assertVetoes(ITaskConfig task, IHostAttributes hostAttributes, Veto... vetoes) {
-    assertVetoes(task, hostAttributes, EMPTY, vetoes);
+    assertVetoes(task, hostAttributes, empty(), vetoes);
   }
 
   private void assertVetoes(

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index c1c3eca..fb03f25 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -120,7 +120,8 @@ public abstract class AbstractJettyTest extends EasyMockTest {
                 new TaskGroupsSettings(
                     Amount.of(1L, Time.MILLISECONDS),
                     bindMock(BackoffStrategy.class),
-                    RateLimiter.create(1000)));
+                    RateLimiter.create(1000),
+                    5));
             bind(ServiceGroupMonitor.class).toInstance(serviceGroupMonitor);
             bindMock(CronJobManager.class);
             bindMock(LockManager.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index ee5c652..64da234 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -65,7 +65,7 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.PREFERRED_TIER;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl.ORDER;
 import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag;
@@ -129,7 +129,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     return filter.filterPreemptionVictims(
         ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
         preemptionVictims(victims),
-        EMPTY,
+        empty(),
         offer,
         storageUtil.mutableStoreProvider);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
index 98048fa..40c42b1 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
@@ -45,7 +45,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationStatName;
 import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.successStatName;
 import static org.easymock.EasyMock.anyObject;
@@ -133,7 +133,7 @@ public class PreemptorImplTest extends EasyMockTest {
   }
 
   private Optional<String> callPreemptor() {
-    return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider);
+    return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), empty(), storeProvider);
   }
 
   private void expectSlotValidation(
@@ -143,7 +143,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expect(preemptionVictimFilter.filterPreemptionVictims(
         TASK.getAssignedTask().getTask(),
         slot.getVictims(),
-        EMPTY,
+        empty(),
         Optional.of(OFFER),
         storeProvider)).andReturn(victims);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
index 2c3e5f3..67b6d69 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
@@ -78,7 +78,7 @@ public class PreemptorModuleTest extends EasyMockTest {
         Optional.absent(),
         injector.getInstance(Preemptor.class).attemptPreemptionFor(
             IAssignedTask.build(new AssignedTask()),
-            AttributeAggregate.EMPTY,
+            AttributeAggregate.empty(),
             storageUtil.mutableStoreProvider));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
index 8872962..566e0d9 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
+import java.util.Set;
+
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.RateLimiter;
 
@@ -27,8 +29,8 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupBatchWorker;
 import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -50,6 +52,7 @@ public class TaskGroupsTest extends EasyMockTest {
   private static final Amount<Long, Time> RESCHEDULE_DELAY = FIRST_SCHEDULE_DELAY;
   private static final IJobKey JOB_A = IJobKey.build(new JobKey("role", "test", "jobA"));
   private static final String TASK_A_ID = "a";
+  private static final Set<String> SCHEDULED_RESULT = ImmutableSet.of(TASK_A_ID);
 
   private BackoffStrategy backoffStrategy;
   private TaskScheduler taskScheduler;
@@ -73,7 +76,7 @@ public class TaskGroupsTest extends EasyMockTest {
     batchWorker = createMock(TaskGroupBatchWorker.class);
     taskGroups = new TaskGroups(
         executor,
-        new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter),
+        new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter, 2),
         taskScheduler,
         rescheduleCalculator,
         batchWorker);
@@ -82,8 +85,10 @@ public class TaskGroupsTest extends EasyMockTest {
   @Test
   public void testEvaluatedAfterFirstSchedulePenalty() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D);
-    expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
-    expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
+    expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID))))
+        .andReturn(SCHEDULED_RESULT);
+    expectBatchExecute(batchWorker, storageUtil.storage, control, SCHEDULED_RESULT)
+        .anyTimes();
 
     control.replay();
 
@@ -95,15 +100,17 @@ public class TaskGroupsTest extends EasyMockTest {
   public void testTaskDeletedBeforeEvaluating() throws Exception {
     final IScheduledTask task = makeTask(TASK_A_ID);
     expect(rateLimiter.acquire()).andReturn(0D);
-    expect(taskScheduler.schedule(anyObject(), eq(Tasks.id(task)))).andAnswer(() -> {
-      // Test a corner case where a task is deleted while it is being evaluated by the task
-      // scheduler.  If not handled carefully, this could result in the scheduler trying again
-      // later to satisfy the deleted task.
-      taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
-
-      return false;
-    });
-    expectBatchExecute(batchWorker, storageUtil.storage, control, false).anyTimes();
+    expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID))))
+        .andAnswer(() -> {
+          // Test a corner case where a task is deleted while it is being evaluated by the task
+          // scheduler.  If not handled carefully, this could result in the scheduler trying again
+          // later to satisfy the deleted task.
+          taskGroups.tasksDeleted(new PubsubEvent.TasksDeleted(ImmutableSet.of(task)));
+
+          return ImmutableSet.of();
+        });
+    expectBatchExecute(batchWorker, storageUtil.storage, control, ImmutableSet.of())
+        .anyTimes();
     expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY.as(Time.MILLISECONDS)))
         .andReturn(0L);
 
@@ -117,8 +124,10 @@ public class TaskGroupsTest extends EasyMockTest {
   public void testEvaluatedOnStartup() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D);
     expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L);
-    expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
-    expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
+    expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID))))
+        .andReturn(ImmutableSet.of(TASK_A_ID));
+    expectBatchExecute(batchWorker, storageUtil.storage, control, SCHEDULED_RESULT)
+        .anyTimes();
 
     control.replay();
 
@@ -128,11 +137,19 @@ public class TaskGroupsTest extends EasyMockTest {
   }
 
   @Test
-  public void testResistStarvation() throws Exception {
+  public void testMultipleTasksAndResistStarvation() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D).times(2);
-    expect(taskScheduler.schedule(anyObject(), eq("a0"))).andReturn(true);
-    expect(taskScheduler.schedule(anyObject(), eq("b0"))).andReturn(true);
-    expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
+    expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("a0", "a1"))))
+        .andReturn(ImmutableSet.of("a0", "a1"));
+    expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("b0"))))
+        .andReturn(ImmutableSet.of("b0"));
+    expectBatchExecute(
+        batchWorker,
+        storageUtil.storage,
+        control,
+        ImmutableSet.of("a0", "a1")).anyTimes();
+    expectBatchExecute(batchWorker, storageUtil.storage, control, ImmutableSet.of("b0"))
+        .anyTimes();
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index a4e87d2..fa1a8178 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.scheduling;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 
 import com.google.common.base.Function;
@@ -63,21 +64,23 @@ import org.junit.Test;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 public class TaskSchedulerImplTest extends EasyMockTest {
-
+  private static final String TASK_ID = "a";
   private static final IScheduledTask TASK_A =
-      TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a"));
+      TaskTestUtil.makeTask(TASK_ID, JobKeys.from("a", "a", "a"));
   private static final TaskGroupKey GROUP_KEY =
       TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
   private static final String SLAVE_ID = "HOST_A";
   private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
+  private static final ImmutableSet<String> SINGLE_TASK = ImmutableSet.of(TASK_ID);
+  private static final Set<String> SCHEDULED_RESULT = ImmutableSet.of(TASK_ID);
+  private static final Set<String> NOT_SCHEDULED_RESULT = ImmutableSet.of();
 
   private StorageTestUtil storageUtil;
   private TaskAssigner assigner;
@@ -134,15 +137,15 @@ public class TaskSchedulerImplTest extends EasyMockTest {
             .getName()).get());
   }
 
-  private IExpectationSetters<Boolean> expectAssigned(
+  private IExpectationSetters<Set<String>> expectAssigned(
       IScheduledTask task,
       Map<String, TaskGroupKey> reservationMap) {
 
     return expect(assigner.maybeAssign(
         storageUtil.mutableStoreProvider,
-        new ResourceRequest(task.getAssignedTask().getTask(), bag(task), EMPTY),
+        new ResourceRequest(task.getAssignedTask().getTask(), bag(task), empty()),
         TaskGroupKey.from(task.getAssignedTask().getTask()),
-        Tasks.id(task),
+        ImmutableSet.of(Tasks.id(task)),
         reservationMap));
   }
 
@@ -153,11 +156,13 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectAsMap(NO_RESERVATION);
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectAssigned(TASK_A, NO_RESERVATION).andReturn(true);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(SCHEDULED_RESULT);
 
     control.replay();
 
-    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertEquals(
+        SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
   }
 
   @Test
@@ -169,7 +174,45 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertEquals(
+        SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
+  }
+
+  @Test
+  public void testSchedulePartial() throws Exception {
+    storageUtil.expectOperations();
+
+    String taskB = "b";
+    expectAsMap(NO_RESERVATION);
+    storageUtil.expectTaskFetch(
+        Query.taskScoped(Tasks.id(TASK_A), taskB).byStatus(PENDING),
+        ImmutableSet.of(TASK_A));
+    expectActiveJobFetch(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(SCHEDULED_RESULT);
+
+    control.replay();
+
+    // Task b should be returned as well to be purged from its TaskGroup.
+    assertEquals(
+        ImmutableSet.of(TASK_ID, taskB),
+        scheduler.schedule(storageUtil.mutableStoreProvider, ImmutableSet.of(TASK_ID, taskB)));
+  }
+
+  @Test
+  public void testMultipleGroupsRejected() {
+    storageUtil.expectOperations();
+
+    String taskB = "b";
+    storageUtil.expectTaskFetch(
+        Query.taskScoped(Tasks.id(TASK_A), taskB).byStatus(PENDING),
+        ImmutableSet.of(TASK_A, TaskTestUtil.makeTask(taskB, JobKeys.from("b", "b", "b"))));
+
+    control.replay();
+
+    assertEquals(
+        NOT_SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, ImmutableSet.of(TASK_ID, taskB)));
   }
 
   @Test
@@ -179,15 +222,15 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     // No reservation available in preemptor
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT);
     expectAsMap(NO_RESERVATION);
     expectNoReservation(TASK_A);
-    expectPreemptorCall(TASK_A, Optional.<String>absent());
+    expectPreemptorCall(TASK_A, Optional.absent());
 
     // Slave is reserved.
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT);
     expectAsMap(NO_RESERVATION);
     expectNoReservation(TASK_A);
     expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
@@ -197,13 +240,19 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectAsMap(ImmutableMap.of(SLAVE_ID, GROUP_KEY));
-    expectAssigned(TASK_A, ImmutableMap.of(SLAVE_ID, GROUP_KEY)).andReturn(true);
+    expectAssigned(TASK_A, ImmutableMap.of(SLAVE_ID, GROUP_KEY)).andReturn(SCHEDULED_RESULT);
 
     control.replay();
 
-    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
-    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
-    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertEquals(
+        NOT_SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
+    assertEquals(
+        NOT_SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
+    assertEquals(
+        SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
   }
 
   @Test
@@ -213,12 +262,14 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectAsMap(NO_RESERVATION);
-    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT);
     expectGetReservation(TASK_A, SLAVE_ID);
 
     control.replay();
 
-    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertEquals(
+        NOT_SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
   }
 
   @Test
@@ -228,12 +279,14 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectAsMap(NO_RESERVATION);
-    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(NOT_SCHEDULED_RESULT);
     expectGetReservation(TASK_A, SLAVE_ID);
 
     control.replay();
 
-    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertEquals(
+        NOT_SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
   }
 
   @Test
@@ -274,15 +327,15 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectAsMap(NO_RESERVATION);
     expect(assigner.maybeAssign(
         EasyMock.anyObject(),
-        eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), EMPTY)),
+        eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), empty())),
         eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())),
-        eq(Tasks.id(taskA)),
-        eq(NO_RESERVATION))).andReturn(true);
+        eq(SINGLE_TASK),
+        eq(NO_RESERVATION))).andReturn(SCHEDULED_RESULT);
 
     control.replay();
 
     memStorage.write((NoResult.Quiet)
-        store -> assertTrue(scheduler.schedule(store, Tasks.id(taskA))));
+        store -> assertEquals(SCHEDULED_RESULT, scheduler.schedule(store, SINGLE_TASK)));
   }
 
   @Test
@@ -296,13 +349,15 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertEquals(
+        NOT_SCHEDULED_RESULT,
+        scheduler.schedule(storageUtil.mutableStoreProvider, SINGLE_TASK));
   }
 
   private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
     expect(preemptor.attemptPreemptionFor(
         task.getAssignedTask(),
-        EMPTY,
+        empty(),
         storageUtil.mutableStoreProvider)).andReturn(result);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f559e930/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index b4d27f6..b482be5 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.state;
 
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
@@ -21,6 +22,7 @@ import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.TaskConfig;
@@ -28,6 +30,7 @@ import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
@@ -57,7 +60,7 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
@@ -70,8 +73,7 @@ import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
 
 public class TaskAssignerImplTest extends EasyMockTest {
 
@@ -79,7 +81,10 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
   private static final String SLAVE_ID = MESOS_OFFER.getSlaveId().getValue();
   private static final HostOffer OFFER =
-      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
+      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
+          .setHost(MESOS_OFFER.getHostname())
+          .setAttributes(ImmutableSet.of(
+              new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
   private static final IScheduledTask TASK = makeTask("id", JOB);
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
@@ -91,10 +96,23 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private static final UnusedResource UNUSED = new UnusedResource(
       bagFromMesosResources(MESOS_OFFER.getResourcesList()),
       OFFER.getAttributes());
-  private static final ResourceRequest RESOURCE_REQUEST = new ResourceRequest(
-      TASK.getAssignedTask().getTask(),
-      ResourceBag.EMPTY,
-      EMPTY);
+  private static final HostOffer OFFER_2 = new HostOffer(
+      Offer.newBuilder()
+          .setId(OfferID.newBuilder().setValue("offerId0"))
+              .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
+              .setSlaveId(SlaveID.newBuilder().setValue("slaveId0"))
+              .setHostname("hostName0")
+          .addResources(Resource.newBuilder()
+          .setName("ports")
+          .setType(Type.RANGES)
+          .setRanges(
+              Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
+              .build(),
+      IHostAttributes.build(new HostAttributes()));
+
+  private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
+
+  private ResourceRequest resourceRequest;
 
   private MutableStoreProvider storeProvider;
   private StateManager stateManager;
@@ -113,26 +131,43 @@ public class TaskAssignerImplTest extends EasyMockTest {
     offerManager = createMock(OfferManager.class);
     tierManager = createMock(TierManager.class);
     assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager, tierManager);
+    resourceRequest = new ResourceRequest(
+        TASK.getAssignedTask().getTask(),
+        ResourceBag.EMPTY,
+        empty());
   }
 
   @Test
-  public void testAssignNoVetoes() throws Exception {
+  public void testAssignNoTasks() throws Exception {
+    control.replay();
+
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
+  }
+
+  @Test
+  public void testAssignPartialNoVetoes() throws Exception {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
+    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
     expectAssignTask(MESOS_OFFER);
     expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
         .andReturn(TASK_INFO);
 
     control.replay();
 
-    assertTrue(assigner.maybeAssign(
-        storeProvider,
-        new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, EMPTY),
-        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-        Tasks.id(TASK),
-        ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    AttributeAggregate aggregate = empty();
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(Tasks.id(TASK), "id2", "id3"),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
   }
 
   @Test
@@ -140,43 +175,47 @@ public class TaskAssignerImplTest extends EasyMockTest {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, RESOURCE_REQUEST))
+    expect(filter.filter(UNUSED, resourceRequest))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
 
     control.replay();
 
-    assertFalse(assigner.maybeAssign(
-        storeProvider,
-        RESOURCE_REQUEST,
-        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-        Tasks.id(TASK),
-        NO_RESERVATION));
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(Tasks.id(TASK)),
+            NO_RESERVATION));
   }
 
   @Test
   public void testAssignVetoesWithNoStaticBan() throws Exception {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, RESOURCE_REQUEST))
+    expect(filter.filter(UNUSED, resourceRequest))
         .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
 
     control.replay();
 
-    assertFalse(assigner.maybeAssign(
-        storeProvider,
-        RESOURCE_REQUEST,
-        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-        Tasks.id(TASK),
-        NO_RESERVATION));
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(Tasks.id(TASK)),
+            NO_RESERVATION));
   }
 
   @Test
   public void testAssignmentClearedOnError() throws Exception {
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2));
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
+    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
     expectAssignTask(MESOS_OFFER);
     expect(stateManager.changeState(
         storeProvider,
@@ -190,27 +229,33 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(assigner.maybeAssign(
-        storeProvider,
-        RESOURCE_REQUEST,
-        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-        Tasks.id(TASK),
-        NO_RESERVATION));
+    // Ensures scheduling loop terminates on the first launch failure.
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(Tasks.id(TASK), "id2", "id3"),
+            NO_RESERVATION));
   }
 
   @Test
   public void testAssignmentSkippedForReservedSlave() throws Exception {
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
 
     control.replay();
 
-    assertFalse(assigner.maybeAssign(
-        storeProvider,
-        RESOURCE_REQUEST,
-        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-        Tasks.id(TASK),
-        ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
-            ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(Tasks.id(TASK)),
+            ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
+                ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
   }
 
   @Test
@@ -218,36 +263,28 @@ public class TaskAssignerImplTest extends EasyMockTest {
     // Ensures slave/task reservation relationship is only enforced in slave->task direction
     // and permissive in task->slave direction. In other words, a task with a slave reservation
     // should still be tried against other unreserved slaves.
-    HostOffer offer = new HostOffer(
-        Offer.newBuilder()
-            .setId(OfferID.newBuilder().setValue("offerId0"))
-            .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-            .setSlaveId(SlaveID.newBuilder().setValue("slaveId0"))
-            .setHostname("hostName0")
-            .addResources(Resource.newBuilder()
-                .setName("ports")
-                .setType(Type.RANGES)
-                .setRanges(
-                    Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
-            .build(),
-        IHostAttributes.build(new HostAttributes()));
-
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER));
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER));
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
-    expectAssignTask(offer.getOffer());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer()))
+    expect(filter.filter(
+        new UnusedResource(
+            bagFromMesosResources(OFFER_2.getOffer().getResourcesList()),
+            OFFER_2.getAttributes()),
+        resourceRequest)).andReturn(ImmutableSet.of());
+    expectAssignTask(OFFER_2.getOffer());
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer()))
         .andReturn(TASK_INFO);
-    offerManager.launchTask(offer.getOffer().getId(), TASK_INFO);
+    offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
 
     control.replay();
 
-    assertTrue(assigner.maybeAssign(
-        storeProvider,
-        RESOURCE_REQUEST,
-        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-        Tasks.id(TASK),
-        ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(Tasks.id(TASK)),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
   }
 
   @Test
@@ -268,18 +305,18 @@ public class TaskAssignerImplTest extends EasyMockTest {
         IHostAttributes.build(new HostAttributes()));
 
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER).times(2);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(filter.filter(
         new UnusedResource(
             bagFromMesosResources(mismatched.getOffer().getResourcesList()),
             mismatched.getAttributes()),
-        RESOURCE_REQUEST))
+        resourceRequest))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch")));
     offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY);
     expect(filter.filter(
         new UnusedResource(
             bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()),
-        RESOURCE_REQUEST))
+        resourceRequest))
         .andReturn(ImmutableSet.of());
 
     expectAssignTask(MESOS_OFFER);
@@ -289,12 +326,14 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertTrue(assigner.maybeAssign(
-        storeProvider,
-        RESOURCE_REQUEST,
-        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-        Tasks.id(TASK),
-        ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(Tasks.id(TASK)),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
   }
 
   @Test