You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/20 00:53:25 UTC

incubator-aurora git commit: Simplify Preemptor code, encapsulate parameters used there and in SchedulingFilter.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 4e52d00b8 -> 5cdc3900c


Simplify Preemptor code, encapsulate parameters used there and in SchedulingFilter.

Reviewed at https://reviews.apache.org/r/28103/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/5cdc3900
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/5cdc3900
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/5cdc3900

Branch: refs/heads/master
Commit: 5cdc3900cef5cb99435e1cd8056bebf118d9043c
Parents: 4e52d00
Author: Bill Farner <wf...@apache.org>
Authored: Wed Nov 19 15:49:36 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Nov 19 15:49:36 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/Preemptor.java       | 117 ++++++--------
 .../aurora/scheduler/async/TaskScheduler.java   |  24 +--
 .../org/apache/aurora/scheduler/base/Tasks.java |   9 --
 .../configuration/ConfigurationManager.java     |   6 +-
 .../events/NotifyingSchedulingFilter.java       |  16 +-
 .../scheduler/filter/ConstraintFilter.java      | 139 -----------------
 .../scheduler/filter/ConstraintMatcher.java     | 131 ++++++++++++++++
 .../scheduler/filter/SchedulingFilter.java      | 103 +++++++++++--
 .../scheduler/filter/SchedulingFilterImpl.java  | 151 ++++++-------------
 .../aurora/scheduler/state/TaskAssigner.java    |  45 +++---
 .../aurora/scheduler/stats/ResourceCounter.java |   4 +-
 .../scheduler/async/PreemptorImplTest.java      |  25 ++-
 .../scheduler/async/TaskSchedulerImplTest.java  |  14 +-
 .../scheduler/async/TaskSchedulerTest.java      |  23 +--
 .../events/NotifyingSchedulingFilterTest.java   |  21 +--
 .../filter/SchedulingFilterImplTest.java        |  63 +++++---
 .../scheduler/state/TaskAssignerImplTest.java   |  26 ++--
 17 files changed, 460 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index ff26c49..97d5d13 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -26,6 +26,7 @@ import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -50,11 +51,14 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -106,14 +110,6 @@ public interface Preemptor {
     static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
         EnumSet.copyOf(Sets.difference(Tasks.SLAVE_ASSIGNED_STATES, EnumSet.of(PREEMPTING))));
 
-    private static final Function<IAssignedTask, Integer> GET_PRIORITY =
-        new Function<IAssignedTask, Integer>() {
-          @Override
-          public Integer apply(IAssignedTask task) {
-            return task.getTask().getPriority();
-          }
-        };
-
     private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
     // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
     private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
@@ -184,20 +180,11 @@ public interface Preemptor {
           }
         };
 
-    private static Predicate<IAssignedTask> canPreempt(final IAssignedTask pending) {
-      return new Predicate<IAssignedTask>() {
-        @Override
-        public boolean apply(IAssignedTask possibleVictim) {
-          return preemptionFilter(possibleVictim).apply(pending);
-        }
-      };
-    }
-
     private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
         new Function<IAssignedTask, ResourceSlot>() {
           @Override
-          public ResourceSlot apply(IAssignedTask input) {
-            return ResourceSlot.from(input.getTask());
+          public ResourceSlot apply(IAssignedTask task) {
+            return ResourceSlot.from(task.getTask());
           }
         };
 
@@ -239,7 +226,7 @@ public interface Preemptor {
         Iterable<IAssignedTask> possibleVictims,
         Iterable<HostOffer> offers,
         IAssignedTask pendingTask,
-        AttributeAggregate attributeAggregate) {
+        AttributeAggregate jobState) {
 
       // This enforces the precondition that all of the resources are from the same host. We need to
       // get the host for the schedulingFilter.
@@ -263,19 +250,18 @@ public interface Preemptor {
             FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
 
         Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
-            slackResources,
-            attributes,
-            pendingTask.getTask(),
-            pendingTask.getTaskId(),
-            attributeAggregate);
+            new UnusedResource(slackResources, attributes),
+            new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
 
         if (vetoes.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
         }
       }
 
-      FluentIterable<IAssignedTask> preemptableTasks =
-          FluentIterable.from(possibleVictims).filter(canPreempt(pendingTask));
+      FluentIterable<IAssignedTask> preemptableTasks = FluentIterable.from(possibleVictims)
+          .filter(Predicates.compose(
+              preemptionFilter(pendingTask.getTask()),
+              Tasks.ASSIGNED_TO_INFO));
 
       if (preemptableTasks.isEmpty()) {
         return Optional.absent();
@@ -299,11 +285,8 @@ public interface Preemptor {
         }
 
         Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
-            totalResource,
-            attributes.get(),
-            pendingTask.getTask(),
-            pendingTask.getTaskId(),
-            attributeAggregate);
+            new UnusedResource(totalResource, attributes.get()),
+            new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
 
         if (vetoes.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
@@ -329,12 +312,23 @@ public interface Preemptor {
           }
         };
 
+    /**
+     * Order by production flag (true, then false), subsorting by task ID.
+     * TODO(wfarner): Re-evaluate - what do we gain from sorting by task ID?
+     */
+    private static final Ordering<IAssignedTask> SCHEDULING_ORDER =
+        Ordering.explicit(true, false)
+            .onResultOf(Functions.compose(
+                Functions.forPredicate(Tasks.IS_PRODUCTION),
+                Tasks.ASSIGNED_TO_INFO))
+            .compound(Ordering.natural().onResultOf(Tasks.ASSIGNED_TO_ID));
+
     private Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
       // Only non-pending active tasks may be preempted.
       List<IAssignedTask> activeTasks = fetch(CANDIDATE_QUERY);
 
       // Walk through the preemption candidates in reverse scheduling order.
-      Collections.sort(activeTasks, Tasks.SCHEDULING_ORDER.reverse());
+      Collections.sort(activeTasks, SCHEDULING_ORDER.reverse());
 
       // Group the tasks by slave id so they can be paired with offers from the same slave.
       return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
@@ -402,9 +396,6 @@ public interface Preemptor {
       return Optional.absent();
     }
 
-    private static final Predicate<IAssignedTask> IS_PRODUCTION =
-        Predicates.compose(Tasks.IS_PRODUCTION, Tasks.ASSIGNED_TO_INFO);
-
     /**
      * Creates a static filter that will identify tasks that may preempt the provided task.
      * A task may preempt another task if the following conditions hold true:
@@ -412,48 +403,32 @@ public interface Preemptor {
      * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
      *     OR {@code preemptableTask} is non-production and the compared task is production.
      *
-     * @param preemptableTask Task to possibly preempt.
+     * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
      * @return A filter that will compare the priorities and resources required by other tasks
      *     with {@code preemptableTask}.
      */
-    private static Predicate<IAssignedTask> preemptionFilter(IAssignedTask preemptableTask) {
-      Predicate<IAssignedTask> preemptableIsProduction = preemptableTask.getTask().isProduction()
-          ? Predicates.<IAssignedTask>alwaysTrue()
-          : Predicates.<IAssignedTask>alwaysFalse();
-
-      Predicate<IAssignedTask> priorityFilter =
-          greaterPriorityFilter(GET_PRIORITY.apply(preemptableTask));
-      return Predicates.or(
-          Predicates.and(Predicates.not(preemptableIsProduction), IS_PRODUCTION),
-          Predicates.and(isOwnedBy(getRole(preemptableTask)), priorityFilter)
-      );
-    }
-
-    private static Predicate<IAssignedTask> isOwnedBy(final String role) {
-      return new Predicate<IAssignedTask>() {
-        @Override
-        public boolean apply(IAssignedTask task) {
-          return getRole(task).equals(role);
-        }
-      };
-    }
-
-    private static String getRole(IAssignedTask task) {
-      return task.getTask().getJob().getRole();
-    }
-
-    private static Predicate<Integer> greaterThan(final int value) {
-      return new Predicate<Integer>() {
+    private static Predicate<ITaskConfig> preemptionFilter(final ITaskConfig pendingTask) {
+      return new Predicate<ITaskConfig>() {
         @Override
-        public boolean apply(Integer input) {
-          return input > value;
+        public boolean apply(ITaskConfig possibleVictim) {
+          boolean pendingIsProduction = pendingTask.isProduction();
+          boolean victimIsProduction = possibleVictim.isProduction();
+
+          if (pendingIsProduction && !victimIsProduction) {
+            return true;
+          } else if (pendingIsProduction == victimIsProduction) {
+            // If production flags are equal, preemption is based on priority within the same role.
+            if (pendingTask.getJob().getRole().equals(possibleVictim.getJob().getRole())) {
+              return pendingTask.getPriority() > possibleVictim.getPriority();
+            } else {
+              return false;
+            }
+          } else {
+            return false;
+          }
         }
       };
     }
-
-    private static Predicate<IAssignedTask> greaterPriorityFilter(int priority) {
-      return Predicates.compose(greaterThan(priority), GET_PRIORITY);
-    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 6bfa3ac..f247ccf 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -47,6 +47,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -55,6 +56,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskInfo;
 
@@ -129,9 +131,7 @@ public interface TaskScheduler extends EventSubscriber {
 
     private Function<HostOffer, Optional<TaskInfo>> getAssignerFunction(
         final MutableStoreProvider storeProvider,
-        final AttributeAggregate attributeAggregate,
-        final String taskId,
-        final IScheduledTask task) {
+        final ResourceRequest resourceRequest) {
 
       // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
       // and perform the assignment at the very end.  This will allow us to use optimistic locking
@@ -142,16 +142,16 @@ public interface TaskScheduler extends EventSubscriber {
           Optional<String> reservedTaskId =
               reservations.getSlaveReservation(offer.getOffer().getSlaveId());
           if (reservedTaskId.isPresent()) {
-            if (taskId.equals(reservedTaskId.get())) {
+            if (resourceRequest.getTaskId().equals(reservedTaskId.get())) {
               // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(storeProvider, offer, task, attributeAggregate);
+              return assigner.maybeAssign(storeProvider, offer, resourceRequest);
             } else {
               // Slave is reserved for another task.
               return Optional.absent();
             }
           } else {
             // Slave is not reserved.
-            return assigner.maybeAssign(storeProvider, offer, task, attributeAggregate);
+            return assigner.maybeAssign(storeProvider, offer, resourceRequest);
           }
         }
       };
@@ -189,16 +189,18 @@ public interface TaskScheduler extends EventSubscriber {
           @Override
           public Boolean apply(MutableStoreProvider store) {
             LOG.fine("Attempting to schedule task " + taskId);
-            final IScheduledTask task = Iterables.getOnlyElement(
-                store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+            final ITaskConfig task = Iterables.getOnlyElement(
+                Iterables.transform(
+                    store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+                    Tasks.SCHEDULED_TO_INFO),
                 null);
             if (task == null) {
               LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
             } else {
-              AttributeAggregate aggregate =
-                  getJobState(store, Tasks.SCHEDULED_TO_JOB_KEY.apply(task));
+              AttributeAggregate aggregate = getJobState(store, task.getJob());
               try {
-                if (!offerQueue.launchFirst(getAssignerFunction(store, aggregate, taskId, task))) {
+                ResourceRequest resourceRequest = new ResourceRequest(task, taskId, aggregate);
+                if (!offerQueue.launchFirst(getAssignerFunction(store, resourceRequest))) {
                   // Task could not be scheduled.
                   maybePreemptFor(taskId, aggregate);
                   attemptsNoMatch.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index a2997f5..52d37e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -160,15 +160,6 @@ public final class Tasks {
         }
       };
 
-  /**
-   * Order by production flag (true, then false), subsorting by task ID.
-   * TODO(Suman Karumuri): Move this call into SchedulerThriftInterface once SchedulerzRole is gone.
-   */
-  public static final Ordering<IAssignedTask> SCHEDULING_ORDER =
-      Ordering.explicit(true, false)
-          .onResultOf(Functions.compose(Functions.forPredicate(IS_PRODUCTION), ASSIGNED_TO_INFO))
-          .compound(Ordering.natural().onResultOf(ASSIGNED_TO_ID));
-
   private Tasks() {
     // Utility class.
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 11a80f6..01b0350 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -140,7 +140,7 @@ public final class ConfigurationManager {
           new Closure<TaskConfig>() {
             @Override
             public void execute(TaskConfig task) {
-              if (!isDedicated(ITaskConfig.build(task))
+              if (!isDedicated(IConstraint.setFromBuilders(task.getConstraints()))
                   && task.isProduction()
                   && task.isIsService()
                   && !Iterables.any(task.getConstraints(), hasName(RACK_CONSTRAINT))) {
@@ -202,8 +202,8 @@ public final class ConfigurationManager {
     return taskConstraint.getSetField() == TaskConstraint._Fields.VALUE;
   }
 
-  public static boolean isDedicated(ITaskConfig task) {
-    return Iterables.any(task.getConstraints(), getConstraintByName(DEDICATED_ATTRIBUTE));
+  public static boolean isDedicated(Iterable<IConstraint> taskConstraints) {
+    return Iterables.any(taskConstraints, getConstraintByName(DEDICATED_ATTRIBUTE));
   }
 
   @Nullable

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index ca53303..edaf2f4 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -20,12 +20,8 @@ import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
-import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -58,16 +54,10 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   }
 
   @Override
-  public Set<Veto> filter(
-      ResourceSlot offer,
-      IHostAttributes hostAttributes,
-      ITaskConfig task,
-      String taskId,
-      AttributeAggregate jobState) {
-
-    Set<Veto> vetoes = delegate.filter(offer, hostAttributes, task, taskId, jobState);
+  public Set<Veto> filter(UnusedResource resource, ResourceRequest request) {
+    Set<Veto> vetoes = delegate.filter(resource, request);
     if (!vetoes.isEmpty()) {
-      eventSink.post(new Vetoed(taskId, vetoes));
+      eventSink.post(new Vetoed(request.getTaskId(), vetoes));
     }
 
     return vetoes;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
deleted file mode 100644
index 3839083..0000000
--- a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.filter;
-
-import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.storage.entities.IAttribute;
-import org.apache.aurora.scheduler.storage.entities.IConstraint;
-import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Filter that determines whether a task's constraints are satisfied.
- */
-class ConstraintFilter {
-  private final AttributeAggregate cachedjobState;
-  private final Iterable<IAttribute> hostAttributes;
-
-  /**
-   * Creates a new constraint filer for a given job.
-   *
-   * @param cachedjobState Cached information about the job containing the task being matched.
-   * @param hostAttributes The attributes of the host to test against.
-   */
-  ConstraintFilter(AttributeAggregate cachedjobState, Iterable<IAttribute> hostAttributes) {
-    this.cachedjobState = requireNonNull(cachedjobState);
-    this.hostAttributes = requireNonNull(hostAttributes);
-  }
-
-  @VisibleForTesting
-  static Veto limitVeto(String limit) {
-    return new Veto("Limit not satisfied: " + limit, Veto.MAX_SCORE);
-  }
-
-  @VisibleForTesting
-  static Veto mismatchVeto(String constraint) {
-    return Veto.constraintMismatch("Constraint not satisfied: " + constraint);
-  }
-
-  @VisibleForTesting
-  static Veto maintenanceVeto(String reason) {
-    return new Veto("Host " + reason + " for maintenance", Veto.MAX_SCORE);
-  }
-
-  private static final Function<IAttribute, Set<String>> GET_VALUES =
-      new Function<IAttribute, Set<String>>() {
-        @Override
-        public Set<String> apply(IAttribute attribute) {
-          return attribute.getValues();
-        }
-      };
-
-  /**
-   * Gets the veto (if any) for a scheduling constraint based on the {@link AttributeAggregate} this
-   * filter was created with.
-   *
-   * @param constraint Scheduling filter to check.
-   * @return A veto if the constraint is not satisfied based on the existing state of the job.
-   */
-  Optional<Veto> getVeto(IConstraint constraint) {
-    Iterable<IAttribute> sameNameAttributes =
-        Iterables.filter(hostAttributes, new NameFilter(constraint.getName()));
-    Optional<IAttribute> attribute;
-    if (Iterables.isEmpty(sameNameAttributes)) {
-      attribute = Optional.absent();
-    } else {
-      Set<String> attributeValues = ImmutableSet.copyOf(
-          Iterables.concat(Iterables.transform(sameNameAttributes, GET_VALUES)));
-      attribute =
-          Optional.of(IAttribute.build(new Attribute(constraint.getName(), attributeValues)));
-    }
-
-    ITaskConstraint taskConstraint = constraint.getConstraint();
-    switch (taskConstraint.getSetField()) {
-      case VALUE:
-        boolean matches = AttributeFilter.matches(
-            attribute.transform(GET_VALUES).or(ImmutableSet.<String>of()),
-            taskConstraint.getValue());
-        return matches
-            ? Optional.<Veto>absent()
-            : Optional.of(mismatchVeto(constraint.getName()));
-
-      case LIMIT:
-        if (!attribute.isPresent()) {
-          return Optional.of(mismatchVeto(constraint.getName()));
-        }
-
-        boolean satisfied = AttributeFilter.matches(
-            attribute.get(),
-            taskConstraint.getLimit().getLimit(),
-            cachedjobState);
-        return satisfied
-            ? Optional.<Veto>absent()
-            : Optional.of(limitVeto(constraint.getName()));
-
-      default:
-        throw new SchedulerException("Failed to recognize the constraint type: "
-            + taskConstraint.getSetField());
-    }
-  }
-
-  /**
-   * A filter to find attributes matching a name.
-   */
-  static class NameFilter implements Predicate<IAttribute> {
-    private final String attributeName;
-
-    NameFilter(String attributeName) {
-      this.attributeName = attributeName;
-    }
-
-    @Override
-    public boolean apply(IAttribute attribute) {
-      return attributeName.equals(attribute.getName());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
new file mode 100644
index 0000000..cc8c5b8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.filter;
+
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.entities.IAttribute;
+import org.apache.aurora.scheduler.storage.entities.IConstraint;
+import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
+
+/**
+ * Filter that determines whether a task's constraints are satisfied.
+ */
+final class ConstraintMatcher {
+  private ConstraintMatcher() {
+    // Utility class.
+  }
+
+  @VisibleForTesting
+  static Veto limitVeto(String limit) {
+    return new Veto("Limit not satisfied: " + limit, Veto.MAX_SCORE);
+  }
+
+  @VisibleForTesting
+  static Veto mismatchVeto(String constraint) {
+    return Veto.constraintMismatch("Constraint not satisfied: " + constraint);
+  }
+
+  @VisibleForTesting
+  static Veto maintenanceVeto(String reason) {
+    return new Veto("Host " + reason + " for maintenance", Veto.MAX_SCORE);
+  }
+
+  private static final Function<IAttribute, Set<String>> GET_VALUES =
+      new Function<IAttribute, Set<String>>() {
+        @Override
+        public Set<String> apply(IAttribute attribute) {
+          return attribute.getValues();
+        }
+      };
+
+  /**
+   * Gets the veto (if any) for a scheduling constraint based on the {@link AttributeAggregate} this
+   * filter was created with.
+   *
+   * @param constraint Scheduling filter to check.
+   * @return A veto if the constraint is not satisfied based on the existing state of the job.
+   */
+  static Optional<Veto> getVeto(
+      AttributeAggregate cachedjobState,
+      Iterable<IAttribute> hostAttributes,
+      IConstraint constraint) {
+
+    Iterable<IAttribute> sameNameAttributes =
+        Iterables.filter(hostAttributes, new NameFilter(constraint.getName()));
+    Optional<IAttribute> attribute;
+    if (Iterables.isEmpty(sameNameAttributes)) {
+      attribute = Optional.absent();
+    } else {
+      Set<String> attributeValues = ImmutableSet.copyOf(
+          Iterables.concat(Iterables.transform(sameNameAttributes, GET_VALUES)));
+      attribute =
+          Optional.of(IAttribute.build(new Attribute(constraint.getName(), attributeValues)));
+    }
+
+    ITaskConstraint taskConstraint = constraint.getConstraint();
+    switch (taskConstraint.getSetField()) {
+      case VALUE:
+        boolean matches = AttributeFilter.matches(
+            attribute.transform(GET_VALUES).or(ImmutableSet.<String>of()),
+            taskConstraint.getValue());
+        return matches
+            ? Optional.<Veto>absent()
+            : Optional.of(mismatchVeto(constraint.getName()));
+
+      case LIMIT:
+        if (!attribute.isPresent()) {
+          return Optional.of(mismatchVeto(constraint.getName()));
+        }
+
+        boolean satisfied = AttributeFilter.matches(
+            attribute.get(),
+            taskConstraint.getLimit().getLimit(),
+            cachedjobState);
+        return satisfied
+            ? Optional.<Veto>absent()
+            : Optional.of(limitVeto(constraint.getName()));
+
+      default:
+        throw new SchedulerException("Failed to recognize the constraint type: "
+            + taskConstraint.getSetField());
+    }
+  }
+
+  /**
+   * A filter to find attributes matching a name.
+   */
+  static class NameFilter implements Predicate<IAttribute> {
+    private final String attributeName;
+
+    NameFilter(String attributeName) {
+      this.attributeName = attributeName;
+    }
+
+    @Override
+    public boolean apply(IAttribute attribute) {
+      return attributeName.equals(attribute.getName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index c1c5f26..723e7ab 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -19,6 +19,7 @@ import java.util.Set;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -102,19 +103,101 @@ public interface SchedulingFilter {
   }
 
   /**
+   * An available resource in the cluster.
+   */
+  class UnusedResource {
+    private final ResourceSlot offer;
+    private final IHostAttributes attributes;
+
+    public UnusedResource(ResourceSlot offer, IHostAttributes attributes) {
+      this.offer = offer;
+      this.attributes = attributes;
+    }
+
+    public ResourceSlot getResourceSlot() {
+      return offer;
+    }
+
+    public IHostAttributes getAttributes() {
+      return attributes;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof UnusedResource)) {
+        return false;
+      }
+
+      UnusedResource other = (UnusedResource) o;
+      return Objects.equals(getResourceSlot(), other.getResourceSlot())
+          && Objects.equals(getAttributes(), other.getAttributes());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(offer, attributes);
+    }
+  }
+
+  /**
+   * A request for resources in the cluster.
+   */
+  class ResourceRequest {
+    private final ITaskConfig task;
+    private final String taskId;
+    private final AttributeAggregate jobState;
+
+    public ResourceRequest(ITaskConfig task, String taskId, AttributeAggregate jobState) {
+      this.task = task;
+      this.taskId = taskId;
+      this.jobState = jobState;
+    }
+
+    public Iterable<IConstraint> getConstraints() {
+      return task.getConstraints();
+    }
+
+    public ResourceSlot getResourceSlot() {
+      return ResourceSlot.from(task);
+    }
+
+    public AttributeAggregate getJobState() {
+      return jobState;
+    }
+
+    public String getTaskId() {
+      return taskId;
+    }
+
+    public int getNumRequestedPorts() {
+      return task.getRequestedPorts().size();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof ResourceRequest)) {
+        return false;
+      }
+
+      ResourceRequest other = (ResourceRequest) o;
+      return Objects.equals(task, other.task)
+          && Objects.equals(getTaskId(), other.getTaskId())
+          && Objects.equals(getJobState(), other.getJobState());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(task, taskId, jobState);
+    }
+  }
+
+  /**
    * Applies a task against the filter with the given resources, and on the host.
    *
-   * @param offer Resources offered.
-   * @param task Task.
-   * @param taskId Canonical ID of the task.
-   * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
+   * @param resource An available resource in the cluster.
+   * @param request A resource request to match against the {@code resource}.
    * @return A set of vetoes indicating reasons the task cannot be scheduled.  If the task may be
    *    scheduled, the set will be empty.
    */
-  Set<Veto> filter(
-      ResourceSlot offer,
-      IHostAttributes attributes,
-      ITaskConfig task,
-      String taskId,
-      AttributeAggregate attributeAggregate);
+  Set<Veto> filter(UnusedResource resource, ResourceRequest request);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index cc6b53b..cf3bb64 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -18,7 +18,6 @@ import java.util.EnumSet;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -31,9 +30,9 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.storage.entities.IAttribute;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
@@ -46,7 +45,6 @@ import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVe
 /**
  * Implementation of the scheduling filter that ensures resource requirements of tasks are
  * fulfilled, and that tasks are allowed to run on the given machine.
- *
  */
 public class SchedulingFilterImpl implements SchedulingFilter {
 
@@ -57,23 +55,6 @@ public class SchedulingFilterImpl implements SchedulingFilter {
 
   private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED);
 
-  /**
-   * A function that may veto a task.
-   */
-  private interface FilterRule extends Function<ITaskConfig, Iterable<Veto>> { }
-
-  /**
-   * Convenience class for a rule that will only ever have a single veto.
-   */
-  private abstract static class AbstractSingleVetoRule implements FilterRule {
-    @Override
-    public final Iterable<Veto> apply(ITaskConfig task) {
-      return doApply(task).asSet();
-    }
-
-    abstract Optional<Veto> doApply(ITaskConfig task);
-  }
-
   // Scaling ranges to use for comparison of vetos.  This has no real bearing besides trying to
   // determine if a veto along one resource vector is a 'stronger' veto than that of another vector.
   // The values below represent the maximum resources on a typical slave machine.
@@ -115,39 +96,25 @@ public class SchedulingFilterImpl implements SchedulingFilter {
     }
   }
 
-  private Iterable<FilterRule> rulesFromOffer(final ResourceSlot available) {
-    return ImmutableList.<FilterRule>of(
-        new AbstractSingleVetoRule() {
-          @Override
-          public Optional<Veto> doApply(ITaskConfig task) {
-            return CPU.maybeVeto(
-                available.getNumCpus(),
-                ResourceSlot.from(task).getNumCpus());
-          }
-        },
-        new AbstractSingleVetoRule() {
-          @Override
-          public Optional<Veto> doApply(ITaskConfig task) {
-            return RAM.maybeVeto(
-                available.getRam().as(Data.MB),
-                ResourceSlot.from(task).getRam().as(Data.MB));
-          }
-        },
-        new AbstractSingleVetoRule() {
-          @Override
-          public Optional<Veto> doApply(ITaskConfig task) {
-            return DISK.maybeVeto(available.getDisk().as(Data.MB),
-                ResourceSlot.from(task).getDisk().as(Data.MB));
-          }
-        },
-        new AbstractSingleVetoRule() {
-          @Override
-          public Optional<Veto> doApply(ITaskConfig task) {
-            return PORTS.maybeVeto(available.getNumPorts(),
-                ResourceSlot.from(task).getNumPorts());
-          }
-        }
-    );
+  private static void maybeAddVeto(
+      ImmutableList.Builder<Veto> vetoes,
+      ResourceVector vector,
+      double available,
+      double requested) {
+
+    Optional<Veto> veto = vector.maybeVeto(available, requested);
+    if (veto.isPresent()) {
+      vetoes.add(veto.get());
+    }
+  }
+
+  private static Iterable<Veto> getResourceVetoes(ResourceSlot available, ResourceSlot required) {
+    ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
+    maybeAddVeto(vetoes, CPU, available.getNumCpus(), required.getNumCpus());
+    maybeAddVeto(vetoes, RAM, available.getRam().as(Data.MB), required.getRam().as(Data.MB));
+    maybeAddVeto(vetoes, DISK, available.getDisk().as(Data.MB), required.getDisk().as(Data.MB));
+    maybeAddVeto(vetoes, PORTS, available.getNumPorts(), required.getNumPorts());
+    return vetoes.build();
   }
 
   private static boolean isValueConstraint(IConstraint constraint) {
@@ -165,78 +132,58 @@ public class SchedulingFilterImpl implements SchedulingFilter {
         }
       });
 
-  private FilterRule getConstraintFilter(
-      final AttributeAggregate jobState,
-      final IHostAttributes offerAttributes) {
-
-    return new FilterRule() {
-      @Override
-      public Iterable<Veto> apply(final ITaskConfig task) {
-        if (!task.isSetConstraints()) {
-          return ImmutableList.of();
-        }
-
-        ConstraintFilter constraintFilter = new ConstraintFilter(
-            jobState,
-            offerAttributes.getAttributes());
-        ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
-        for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
-          Optional<Veto> veto = constraintFilter.getVeto(constraint);
-          if (veto.isPresent()) {
-            vetoes.add(veto.get());
-            if (isValueConstraint(constraint)) {
-              // Break when a value constraint mismatch is found to avoid other
-              // potentially-expensive operations to satisfy other constraints.
-              break;
-            }
-          }
+  private Iterable<Veto> getConstraintVetoes(
+      Iterable<IConstraint> taskConstraints,
+      AttributeAggregate jobState,
+      Iterable<IAttribute> offerAttributes) {
+
+    ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
+    for (IConstraint constraint : VALUES_FIRST.sortedCopy(taskConstraints)) {
+      Optional<Veto> veto = ConstraintMatcher.getVeto(jobState, offerAttributes, constraint);
+      if (veto.isPresent()) {
+        vetoes.add(veto.get());
+        if (isValueConstraint(constraint)) {
+          // Break when a value constraint mismatch is found to avoid other
+          // potentially-expensive operations to satisfy other constraints.
+          break;
         }
-
-        return vetoes.build();
       }
-    };
+    }
+
+    return vetoes.build();
   }
 
   private Optional<Veto> getMaintenanceVeto(MaintenanceMode mode) {
     return VETO_MODES.contains(mode)
-        ? Optional.of(ConstraintFilter.maintenanceVeto(mode.toString().toLowerCase()))
+        ? Optional.of(ConstraintMatcher.maintenanceVeto(mode.toString().toLowerCase()))
         : NO_VETO;
   }
 
-  private Set<Veto> getResourceVetoes(ResourceSlot offer, ITaskConfig task) {
-    ImmutableSet.Builder<Veto> builder = ImmutableSet.builder();
-    for (FilterRule rule : rulesFromOffer(offer)) {
-      builder.addAll(rule.apply(task));
-    }
-    return builder.build();
-  }
-
   private boolean isDedicated(IHostAttributes attributes) {
     return Iterables.any(
         attributes.getAttributes(),
-        new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
+        new ConstraintMatcher.NameFilter(DEDICATED_ATTRIBUTE));
   }
 
   @Override
-  public Set<Veto> filter(
-      ResourceSlot offer,
-      IHostAttributes attributes,
-      ITaskConfig task,
-      String taskId,
-      AttributeAggregate attributeAggregate) {
-
-    if (!ConfigurationManager.isDedicated(task) && isDedicated(attributes)) {
+  public Set<Veto> filter(UnusedResource resource, ResourceRequest request) {
+    if (!ConfigurationManager.isDedicated(request.getConstraints())
+        && isDedicated(resource.getAttributes())) {
+
       return ImmutableSet.of(DEDICATED_HOST_VETO);
     }
 
-    Optional<Veto> maintenanceVeto = getMaintenanceVeto(attributes.getMode());
+    Optional<Veto> maintenanceVeto = getMaintenanceVeto(resource.getAttributes().getMode());
     if (maintenanceVeto.isPresent()) {
       return maintenanceVeto.asSet();
     }
 
     return ImmutableSet.<Veto>builder()
-        .addAll(getConstraintFilter(attributeAggregate, attributes).apply(task))
-        .addAll(getResourceVetoes(offer, task))
+        .addAll(getConstraintVetoes(
+            request.getConstraints(),
+            request.getJobState(),
+            resource.getAttributes().getAttributes()))
+        .addAll(getResourceVetoes(resource.getResourceSlot(), request.getResourceSlot()))
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 4abc7ba..e1c2974 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -22,14 +22,13 @@ import com.google.common.base.Optional;
 
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.TaskInfo;
 
 import static java.util.Objects.requireNonNull;
@@ -48,15 +47,13 @@ public interface TaskAssigner {
    *
    * @param storeProvider Storage provider.
    * @param offer The resource offer.
-   * @param task The task to match against and optionally assign.
-   * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
+   * @param resourceRequest The request for resources being scheduled.
    * @return Instructions for launching the task if matching and assignment were successful.
    */
   Optional<TaskInfo> maybeAssign(
       MutableStoreProvider storeProvider,
       HostOffer offer,
-      IScheduledTask task,
-      AttributeAggregate attributeAggregate);
+      ResourceRequest resourceRequest);
 
   class TaskAssignerImpl implements TaskAssigner {
     private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
@@ -76,18 +73,22 @@ public interface TaskAssigner {
       this.taskFactory = requireNonNull(taskFactory);
     }
 
-    private TaskInfo assign(MutableStoreProvider storeProvider, Offer offer, IScheduledTask task) {
+    private TaskInfo assign(
+        MutableStoreProvider storeProvider,
+        Offer offer,
+        int numRequestedPorts,
+        String taskId) {
+
       String host = offer.getHostname();
-      Set<Integer> selectedPorts =
-          Resources.getPorts(offer, task.getAssignedTask().getTask().getRequestedPorts().size());
+      Set<Integer> selectedPorts = Resources.getPorts(offer, numRequestedPorts);
       IAssignedTask assigned = stateManager.assignTask(
           storeProvider,
-          Tasks.id(task),
+          taskId,
           host,
           offer.getSlaveId(),
           selectedPorts);
       LOG.info(String.format("Offer on slave %s (id %s) is being assigned task for %s.",
-          host, offer.getSlaveId().getValue(), Tasks.id(task)));
+          host, offer.getSlaveId().getValue(), taskId));
       return taskFactory.createFrom(assigned, offer.getSlaveId());
     }
 
@@ -95,20 +96,20 @@ public interface TaskAssigner {
     public Optional<TaskInfo> maybeAssign(
         MutableStoreProvider storeProvider,
         HostOffer offer,
-        IScheduledTask task,
-        AttributeAggregate attributeAggregate) {
+        ResourceRequest resourceRequest) {
 
       Set<Veto> vetoes = filter.filter(
-          ResourceSlot.from(offer.getOffer()),
-          offer.getAttributes(),
-          task.getAssignedTask().getTask(),
-          Tasks.id(task),
-          attributeAggregate);
+          new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
+          resourceRequest);
       if (vetoes.isEmpty()) {
-        return Optional.of(assign(storeProvider, offer.getOffer(), task));
+        return Optional.of(assign(
+            storeProvider,
+            offer.getOffer(),
+            resourceRequest.getNumRequestedPorts(),
+            resourceRequest.getTaskId()));
       } else {
-        LOG.fine("Slave " + offer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
-            + ": " + vetoes);
+        LOG.fine("Slave " + offer.getOffer().getHostname()
+            + " vetoed task " + resourceRequest.getTaskId() + ": " + vetoes);
         return Optional.absent();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
index 79d12b0..c30a8c9 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
@@ -136,7 +136,7 @@ public class ResourceCounter {
     DEDICATED_CONSUMED(new Predicate<ITaskConfig>() {
       @Override
       public boolean apply(ITaskConfig task) {
-        return ConfigurationManager.isDedicated(task);
+        return ConfigurationManager.isDedicated(task.getConstraints());
       }
     }),
     QUOTA_CONSUMED(new Predicate<ITaskConfig>() {
@@ -148,7 +148,7 @@ public class ResourceCounter {
     FREE_POOL_CONSUMED(new Predicate<ITaskConfig>() {
       @Override
       public boolean apply(ITaskConfig task) {
-        return !ConfigurationManager.isDedicated(task) && !task.isProduction();
+        return !ConfigurationManager.isDedicated(task.getConstraints()) && !task.isProduction();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index 8b0367e..69108cf 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -44,12 +44,13 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -58,7 +59,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
@@ -571,18 +571,15 @@ public class PreemptorImplTest extends EasyMockTest {
 
   private IExpectationSetters<Set<Veto>> expectFiltering() {
     return expect(schedulingFilter.filter(
-        EasyMock.<ResourceSlot>anyObject(),
-        EasyMock.<IHostAttributes>anyObject(),
-        EasyMock.<ITaskConfig>anyObject(),
-        EasyMock.<String>anyObject(),
-        EasyMock.eq(emptyJob))).andAnswer(
-        new IAnswer<Set<Veto>>() {
-          @Override
-          public Set<Veto> answer() {
-            return ImmutableSet.of();
-          }
-        }
-    );
+        EasyMock.<UnusedResource>anyObject(),
+        EasyMock.<ResourceRequest>anyObject()))
+        .andAnswer(
+            new IAnswer<Set<Veto>>() {
+            @Override
+            public Set<Veto> answer() {
+              return ImmutableSet.of();
+            }
+          });
   }
 
   private void expectPreempted(ScheduledTask preempted) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 9bc6a75..17f2d77 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -43,6 +43,7 @@ import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
@@ -134,7 +135,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   private void expectAssigned(IScheduledTask task) {
-    expect(assigner.maybeAssign(storageUtil.mutableStoreProvider, OFFER, task, emptyJob))
+    expect(assigner.maybeAssign(
+        storageUtil.mutableStoreProvider,
+        OFFER,
+        new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), emptyJob)))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
   }
 
@@ -194,7 +198,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
 
-    expect(assigner.maybeAssign(storageUtil.mutableStoreProvider, OFFER, TASK_B, emptyJob))
+    expect(assigner.maybeAssign(
+        storageUtil.mutableStoreProvider,
+        OFFER,
+        new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), emptyJob)))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
     control.replay();
@@ -307,8 +314,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expect(assigner.maybeAssign(
         EasyMock.<MutableStoreProvider>anyObject(),
         eq(OFFER),
-        eq(taskA),
-        eq(emptyJob)))
+        eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), emptyJob))))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
     control.replay();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index d1bc9bf..012804a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -51,6 +51,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -298,8 +299,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     return expect(assigner.maybeAssign(
         EasyMock.<MutableStoreProvider>anyObject(),
         eq(offer),
-        eq(task),
-        eq(jobAggregate)));
+        eq(new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), jobAggregate))));
   }
 
   @Test
@@ -524,17 +524,16 @@ public class TaskSchedulerTest extends EasyMockTest {
     captureB.getValue().run();
   }
 
-  private Capture<IScheduledTask> expectTaskScheduled(IScheduledTask task) {
+  private Capture<ResourceRequest> expectTaskScheduled(IScheduledTask task) {
     TaskInfo mesosTask = makeTaskInfo(task);
-    Capture<IScheduledTask> taskScheduled = createCapture();
+    Capture<ResourceRequest> request = createCapture();
     expect(assigner.maybeAssign(
         EasyMock.<MutableStoreProvider>anyObject(),
         EasyMock.<HostOffer>anyObject(),
-        capture(taskScheduled),
-        EasyMock.eq(emptyJob)))
+        capture(request)))
         .andReturn(Optional.of(mesosTask));
     driver.launchTask(EasyMock.<OfferID>anyObject(), eq(mesosTask));
-    return taskScheduled;
+    return request;
   }
 
   @Test
@@ -566,8 +565,8 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     Capture<Runnable> timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
-    Capture<IScheduledTask> firstScheduled = expectTaskScheduled(jobA0);
-    Capture<IScheduledTask> secondScheduled = expectTaskScheduled(jobB0);
+    Capture<ResourceRequest> firstScheduled = expectTaskScheduled(jobA0);
+    Capture<ResourceRequest> secondScheduled = expectTaskScheduled(jobB0);
 
     // Expect another watch of the task group for job A.
     expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -585,8 +584,10 @@ public class TaskSchedulerTest extends EasyMockTest {
     timeoutA.getValue().run();
     timeoutB.getValue().run();
     assertEquals(
-        ImmutableSet.of(jobA0, jobB0),
-        ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue()));
+        ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)),
+        ImmutableSet.of(
+            firstScheduled.getValue().getTaskId(),
+            secondScheduled.getValue().getTaskId()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 94f0a17..d408ec0 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -26,6 +26,8 @@ import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -43,10 +45,11 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
       .setNumCpus(1)
       .setRamMb(1024)
       .setDiskMb(1024));
-  private static final ResourceSlot TASK_RESOURCES = ResourceSlot.from(TASK);
   private static final String TASK_ID = "taskId";
-  private static final IHostAttributes ATTRIBUTES =
-      IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE));
+  private static final UnusedResource RESOURCE = new UnusedResource(
+      ResourceSlot.from(TASK),
+      IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
+  private ResourceRequest request;
 
   private static final Veto VETO_1 = new Veto("veto1", 1);
   private static final Veto VETO_2 = new Veto("veto2", 2);
@@ -54,36 +57,36 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private SchedulingFilter filter;
   private EventSink eventSink;
   private SchedulingFilter delegate;
-  private AttributeAggregate emptyJob;
 
   @Before
   public void setUp() {
     delegate = createMock(SchedulingFilter.class);
     eventSink = createMock(EventSink.class);
     filter = new NotifyingSchedulingFilter(delegate, eventSink);
-    emptyJob = new AttributeAggregate(
+    AttributeAggregate emptyJob = new AttributeAggregate(
         Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
         createMock(AttributeStore.class));
+    request = new ResourceRequest(TASK, "taskId", emptyJob);
   }
 
   @Test
   public void testEvents() {
     Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
-    expect(delegate.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+    expect(delegate.filter(RESOURCE, request)).andReturn(vetoes);
     eventSink.post(new Vetoed(TASK_ID, vetoes));
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob));
+    assertEquals(vetoes, filter.filter(RESOURCE, request));
   }
 
   @Test
   public void testNoVetoes() {
     Set<Veto> vetoes = ImmutableSet.of();
-    expect(delegate.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+    expect(delegate.filter(RESOURCE, request)).andReturn(vetoes);
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob));
+    assertEquals(vetoes, filter.filter(RESOURCE, request));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index e113eba..d1a7066 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -39,6 +39,8 @@ import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -54,8 +56,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
-import static org.apache.aurora.scheduler.filter.ConstraintFilter.limitVeto;
-import static org.apache.aurora.scheduler.filter.ConstraintFilter.mismatchVeto;
+import static org.apache.aurora.scheduler.filter.ConstraintMatcher.limitVeto;
+import static org.apache.aurora.scheduler.filter.ConstraintMatcher.mismatchVeto;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.DEDICATED_HOST_VETO;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.CPU;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
@@ -63,7 +65,6 @@ import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVe
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class SchedulingFilterImplTest extends EasyMockTest {
 
@@ -171,15 +172,26 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     Set<Veto> none = ImmutableSet.of();
     IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
-    assertEquals(none,
-        defaultFilter.filter(twoPorts, hostA, noPortTask, TASK_ID, emptyJob));
-    assertEquals(none,
-        defaultFilter.filter(twoPorts, hostA, onePortTask, TASK_ID, emptyJob));
-    assertEquals(none,
-        defaultFilter.filter(twoPorts, hostA, twoPortTask, TASK_ID, emptyJob));
+    assertEquals(
+        none,
+        defaultFilter.filter(
+            new UnusedResource(twoPorts, hostA),
+            new ResourceRequest(noPortTask, TASK_ID, emptyJob)));
+    assertEquals(
+        none,
+        defaultFilter.filter(
+            new UnusedResource(twoPorts, hostA),
+            new ResourceRequest(onePortTask, TASK_ID, emptyJob)));
+    assertEquals(
+        none,
+        defaultFilter.filter(
+            new UnusedResource(twoPorts, hostA),
+            new ResourceRequest(twoPortTask, TASK_ID, emptyJob)));
     assertEquals(
         ImmutableSet.of(PORTS.veto(1)),
-        defaultFilter.filter(twoPorts, hostA, threePortTask, TASK_ID, emptyJob));
+        defaultFilter.filter(
+            new UnusedResource(twoPorts, hostA),
+            new ResourceRequest(threePortTask, TASK_ID, emptyJob)));
   }
 
   @Test
@@ -260,7 +272,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     assertVetoes(makeTask(),
         hostAttributes(HOST_A, MaintenanceMode.DRAINING, host(HOST_A), rack(RACK_A)),
-        ConstraintFilter.maintenanceVeto("draining"));
+        ConstraintMatcher.maintenanceVeto("draining"));
   }
 
   @Test
@@ -270,7 +282,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertVetoes(
         makeTask(),
         hostAttributes(HOST_A, MaintenanceMode.DRAINED, host(HOST_A), rack(RACK_A)),
-        ConstraintFilter.maintenanceVeto("drained"));
+        ConstraintMatcher.maintenanceVeto("drained"));
   }
 
   @Test
@@ -435,8 +447,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     Constraint zoneConstraint = makeConstraint("zone", "c");
 
     ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
-    assertTrue(
-        defaultFilter.filter(DEFAULT_OFFER, hostA, task, TASK_ID, emptyJob).isEmpty());
+    assertEquals(
+        ImmutableSet.<Veto>of(),
+        defaultFilter.filter(
+            new UnusedResource(DEFAULT_OFFER, hostA),
+            new ResourceRequest(task, TASK_ID, emptyJob)));
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -524,11 +539,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertEquals(
         expected,
         defaultFilter.filter(
-             DEFAULT_OFFER,
-            hostAttributes,
-            task,
-            TASK_ID,
-            aggregate).isEmpty());
+            new UnusedResource(DEFAULT_OFFER, hostAttributes),
+            new ResourceRequest(task, TASK_ID, aggregate))
+            .isEmpty());
 
     Constraint negated = constraint.deepCopy();
     negated.getConstraint().getValue().setNegated(!value.isNegated());
@@ -536,11 +549,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertEquals(
         !expected,
         defaultFilter.filter(
-            DEFAULT_OFFER,
-            hostAttributes,
-            negatedTask,
-            TASK_ID,
-            aggregate).isEmpty());
+            new UnusedResource(DEFAULT_OFFER, hostAttributes),
+            new ResourceRequest(negatedTask, TASK_ID, aggregate))
+            .isEmpty());
     return task;
   }
 
@@ -568,7 +579,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     assertEquals(
         ImmutableSet.copyOf(vetoes),
-        defaultFilter.filter(DEFAULT_OFFER, hostAttributes, task, TASK_ID, jobState));
+        defaultFilter.filter(
+            new UnusedResource(DEFAULT_OFFER, hostAttributes),
+            new ResourceRequest(task, TASK_ID, jobState)));
   }
 
   private static IHostAttributes hostAttributes(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5cdc3900/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index a8a70b6..411a55a 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -28,6 +28,8 @@ import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
@@ -102,11 +104,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
   @Test
   public void testAssignNoVetoes() {
     expect(filter.filter(
-        ResourceSlot.from(MESOS_OFFER),
-        OFFER.getAttributes(),
-        TASK.getAssignedTask().getTask(),
-        Tasks.id(TASK),
-        emptyJob))
+        new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
+        new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)))
         .andReturn(ImmutableSet.<Veto>of());
     expect(stateManager.assignTask(
         storeProvider,
@@ -122,23 +121,26 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     assertEquals(
         Optional.of(TASK_INFO),
-        assigner.maybeAssign(storeProvider, OFFER, TASK, emptyJob));
+        assigner.maybeAssign(
+            storeProvider,
+            OFFER,
+            new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)));
   }
 
   @Test
   public void testAssignVetoes() {
     expect(filter.filter(
-        ResourceSlot.from(MESOS_OFFER),
-        OFFER.getAttributes(),
-        TASK.getAssignedTask().getTask(),
-        Tasks.id(TASK),
-        emptyJob))
+        new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
+        new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
 
     control.replay();
 
     assertEquals(
         Optional.<TaskInfo>absent(),
-        assigner.maybeAssign(storeProvider, OFFER, TASK, emptyJob));
+        assigner.maybeAssign(
+            storeProvider,
+            OFFER,
+            new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)));
   }
 }