You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/29 20:11:50 UTC

git commit: Work in progress, still updating tests.

Updated Branches:
  refs/heads/wfarner/AURORA-139 [created] ab5395b3e


Work in progress, still updating tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ab5395b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ab5395b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ab5395b3

Branch: refs/heads/wfarner/AURORA-139
Commit: ab5395b3e345f075cad39689325d2f514154934a
Parents: 3870df3
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 29 11:11:34 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 29 11:11:34 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/TaskScheduler.java   |  21 ++-
 .../scheduler/filter/AttributeFilter.java       |   9 +-
 .../aurora/scheduler/filter/CachedJobState.java |  47 +++++
 .../scheduler/filter/ConstraintFilter.java      |  19 +-
 .../scheduler/filter/SchedulingFilter.java      |   8 +-
 .../scheduler/filter/SchedulingFilterImpl.java  |  38 ++--
 .../aurora/scheduler/state/TaskAssigner.java    |  13 +-
 .../filter/SchedulingFilterImplTest.java        | 180 ++++++++++---------
 8 files changed, 196 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 4afc332..75843ad 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -17,6 +17,7 @@ package org.apache.aurora.scheduler.async;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
@@ -31,6 +32,7 @@ import com.google.common.base.Ticker;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.BindingAnnotation;
 import com.twitter.common.inject.TimedInterceptor.Timed;
@@ -40,14 +42,18 @@ import com.twitter.common.stats.StatImpl;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.SlaveID;
@@ -126,24 +132,32 @@ interface TaskScheduler extends EventSubscriber {
       this.reservations = new Reservations(reservationDuration, clock);
     }
 
+    private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
+        EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
+
     private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
+        TaskStore taskStore,
         final String taskId,
         final IScheduledTask task) {
 
+      final CachedJobState cachedJobState = new CachedJobState(taskStore.fetchTasks(
+          Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
+              .byStatus(ACTIVE_NOT_PENDING_STATES)));
+
       return new Function<Offer, Optional<TaskInfo>>() {
         @Override public Optional<TaskInfo> apply(Offer offer) {
           Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
           if (reservedTaskId.isPresent()) {
             if (taskId.equals(reservedTaskId.get())) {
               // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(offer, task);
+              return assigner.maybeAssign(offer, task, cachedJobState);
             } else {
               // Slave is reserved for another task.
               return Optional.absent();
             }
           } else {
             // Slave is not reserved.
-            return assigner.maybeAssign(offer, task);
+            return assigner.maybeAssign(offer, task, cachedJobState);
           }
         }
       };
@@ -168,7 +182,8 @@ interface TaskScheduler extends EventSubscriber {
               LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
             } else {
               try {
-                if (!offerQueue.launchFirst(getAssignerFunction(taskId, task))) {
+                if (!offerQueue.launchFirst(
+                    getAssignerFunction(store.getTaskStore(), taskId, task))) {
                   // Task could not be scheduled.
                   maybePreemptFor(taskId);
                   return TaskSchedulerResult.TRY_AGAIN;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
index 0816d3f..5fe59c8 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -64,21 +63,16 @@ final class AttributeFilter {
    * Tests whether an attribute matches a limit constraint.
    *
    * @param attributes Attributes to match against.
-   * @param jobKey Key of the job with the limited constraint.
    * @param limit Limit value.
    * @param activeTasks All active tasks in the system.
    * @param attributeFetcher Interface for fetching attributes for hosts in the system.
    * @return {@code true} if the limit constraint is satisfied, {@code false} otherwise.
    */
   static boolean matches(final Set<Attribute> attributes,
-      final IJobKey jobKey,
       int limit,
       Iterable<IScheduledTask> activeTasks,
       final AttributeLoader attributeFetcher) {
 
-    Predicate<IScheduledTask> sameJob =
-        Predicates.compose(Predicates.equalTo(jobKey), Tasks.SCHEDULED_TO_JOB_KEY);
-
     Predicate<IScheduledTask> hasAttribute = new Predicate<IScheduledTask>() {
       @Override public boolean apply(IScheduledTask task) {
         Iterable<Attribute> hostAttributes =
@@ -87,7 +81,6 @@ final class AttributeFilter {
       }
     };
 
-    return limit > Iterables.size(
-        Iterables.filter(activeTasks, Predicates.and(sameJob, hasAttribute)));
+    return limit > Iterables.size(Iterables.filter(activeTasks, hasAttribute));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java b/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
new file mode 100644
index 0000000..7d764b9
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.filter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+/**
+ * TODO(wfarner): Take this caching one step further and don't store tasks here, but instead store
+ * pre-computed aggregates of host attributes.  For example, translate each IScheduledTask into
+ * the HostAttributes of the machine it resides on, and count the number of times each attribute is
+ * seen.  This could be represented in a <pre>Map&lt;String, Multiset&lt;String&gt;&gt;</pre>, where
+ * the outer key is the attribute name, and the inner key is attribute values.  So, for a job with
+ * two tasks on the same rack but different hosts, you could have this aggregate:
+ * <pre>
+ * { "host": {"hostA": 1, "hostB": 1},
+ *   "rack": {"rack1": 2}
+ * }
+ * </pre>
+ */
+public class CachedJobState {
+
+  private final ImmutableSet<IScheduledTask> activeTasks;
+
+  public CachedJobState(ImmutableSet<IScheduledTask> activeTasks) {
+    this.activeTasks = Preconditions.checkNotNull(activeTasks);
+  }
+
+  public ImmutableSet<IScheduledTask> getActiveTasks() {
+    return activeTasks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
index 17fc8b9..8c2313c 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
@@ -15,7 +15,6 @@
  */
 package org.apache.aurora.scheduler.filter;
 
-import java.util.Collection;
 import java.util.Set;
 import java.util.logging.Logger;
 
@@ -23,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
@@ -32,8 +30,6 @@ import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -45,27 +41,23 @@ class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
 
   private static final Logger LOG = Logger.getLogger(ConstraintFilter.class.getName());
 
-  private final IJobKey jobKey;
-  private final Supplier<Collection<IScheduledTask>> activeTasksSupplier;
+  private final CachedJobState cachedjobState;
   private final AttributeLoader attributeLoader;
   private final Iterable<Attribute> hostAttributes;
 
   /**
    * Creates a new constraint filer for a given job.
    *
-   * @param jobKey Key for the job.
-   * @param activeTasksSupplier Supplier to fetch active tasks (if necessary).
+   * @param cachedjobState Cached information about the job containing the task being matched.
    * @param attributeLoader Interface to fetch host attributes (if necessary).
    * @param hostAttributes The attributes of the host to test against.
    */
   ConstraintFilter(
-      IJobKey jobKey,
-      Supplier<Collection<IScheduledTask>> activeTasksSupplier,
+      CachedJobState cachedjobState,
       AttributeLoader attributeLoader,
       Iterable<Attribute> hostAttributes) {
 
-    this.jobKey = checkNotNull(jobKey);
-    this.activeTasksSupplier = checkNotNull(activeTasksSupplier);
+    this.cachedjobState = checkNotNull(cachedjobState);
     this.attributeLoader = checkNotNull(attributeLoader);
     this.hostAttributes = checkNotNull(hostAttributes);
   }
@@ -106,9 +98,8 @@ class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
 
         boolean satisfied = AttributeFilter.matches(
             attributes,
-            jobKey,
             taskConstraint.getLimit().getLimit(),
-            activeTasksSupplier.get(),
+            cachedjobState.getActiveTasks(),
             attributeLoader);
         return satisfied
             ? Optional.<Veto>absent()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 999e0f7..f244c35 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -109,8 +109,14 @@ public interface SchedulingFilter {
    * @param slaveHost Host that the resources are associated with.
    * @param task Task.
    * @param taskId Canonical ID of the task.
+   * @param cachedjobState Cached information about the job containing {@code task}.
    * @return A set of vetoes indicating reasons the task cannot be scheduled.  If the task may be
    *    scheduled, the set will be empty.
    */
-  Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId);
+  Set<Veto> filter(
+      ResourceSlot offer,
+      String slaveHost,
+      ITaskConfig task,
+      String taskId,
+      CachedJobState cachedjobState);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 4ba1483..784dde6 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -15,7 +15,6 @@
  */
 package org.apache.aurora.scheduler.filter;
 
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.Set;
@@ -25,23 +24,17 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -49,7 +42,6 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -190,7 +182,8 @@ public class SchedulingFilterImpl implements SchedulingFilter {
 
   private static final Ordering<IConstraint> VALUES_FIRST = Ordering.from(
       new Comparator<IConstraint>() {
-        @Override public int compare(IConstraint a, IConstraint b) {
+        @Override
+        public int compare(IConstraint a, IConstraint b) {
           if (a.getConstraint().getSetField() == b.getConstraint().getSetField()) {
             return 0;
           }
@@ -198,10 +191,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
         }
       });
 
-  private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
-      EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
-
-  private FilterRule getConstraintFilter(final String slaveHost) {
+  private FilterRule getConstraintFilter(final CachedJobState jobState, final String slaveHost) {
     return new FilterRule() {
       @Override public Iterable<Veto> apply(final ITaskConfig task) {
         if (!task.isSetConstraints()) {
@@ -220,18 +210,8 @@ public class SchedulingFilterImpl implements SchedulingFilter {
               }
             };
 
-            Supplier<Collection<IScheduledTask>> activeTasksSupplier =
-                Suppliers.memoize(new Supplier<Collection<IScheduledTask>>() {
-                  @Override public Collection<IScheduledTask> get() {
-                    return storeProvider.getTaskStore().fetchTasks(
-                        Query.jobScoped(Tasks.INFO_TO_JOB_KEY.apply(task))
-                            .byStatus(ACTIVE_NOT_PENDING_STATES));
-                  }
-                });
-
             ConstraintFilter constraintFilter = new ConstraintFilter(
-                Tasks.INFO_TO_JOB_KEY.apply(task),
-                activeTasksSupplier,
+                jobState,
                 attributeLoader,
                 attributeLoader.apply(slaveHost));
             ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
@@ -281,12 +261,18 @@ public class SchedulingFilterImpl implements SchedulingFilter {
   }
 
   @Override
-  public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
+  public Set<Veto> filter(
+      ResourceSlot offer,
+      String slaveHost,
+      ITaskConfig task,
+      String taskId,
+      CachedJobState cachedJobState) {
+
     if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
       return ImmutableSet.of(DEDICATED_HOST_VETO);
     }
     return ImmutableSet.<Veto>builder()
-        .addAll(getConstraintFilter(slaveHost).apply(task))
+        .addAll(getConstraintFilter(cachedJobState, slaveHost).apply(task))
         .addAll(getResourceVetoes(offer, task))
         .addAll(getMaintenanceVeto(slaveHost).asSet())
         .build();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index d8f326e..3154ef0 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -26,6 +26,7 @@ import org.apache.aurora.scheduler.MesosTaskFactory;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -46,9 +47,10 @@ public interface TaskAssigner {
    *
    * @param offer The resource offer.
    * @param task The task to match against and optionally assign.
+   * @param cachedJobState Cached information about the job containing {@code task}.
    * @return Instructions for launching the task if matching and assignment were successful.
    */
-  Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task);
+  Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task, CachedJobState cachedJobState);
 
   class TaskAssignerImpl implements TaskAssigner {
     private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
@@ -83,12 +85,17 @@ public interface TaskAssigner {
     }
 
     @Override
-    public Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task) {
+    public Optional<TaskInfo> maybeAssign(
+        Offer offer,
+        IScheduledTask task,
+        CachedJobState cachedJobState) {
+
       Set<Veto> vetoes = filter.filter(
           ResourceSlot.from(offer),
           offer.getHostname(),
           task.getAssignedTask().getTask(),
-          Tasks.id(task));
+          Tasks.id(task),
+          cachedJobState);
       if (vetoes.isEmpty()) {
         return Optional.of(assign(offer, task));
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index f3df73c..b097098 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -39,7 +39,6 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -47,7 +46,6 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
-import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.easymock.EasyMock;
@@ -64,7 +62,6 @@ import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVe
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.PORTS;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM;
-import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -103,13 +100,15 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       Amount.of(DEFAULT_DISK, Data.MB),
       0);
 
+  private static final CachedJobState EMPTY_JOB =
+      new CachedJobState(ImmutableSet.<IScheduledTask>of());
+
   private final AtomicLong taskIdCounter = new AtomicLong();
 
   private SchedulingFilter defaultFilter;
   private MaintenanceController maintenance;
   private Storage storage;
   private StoreProvider storeProvider;
-  private TaskStore.Mutable taskStore;
   private AttributeStore.Mutable attributeStore;
 
   @Before
@@ -118,13 +117,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     maintenance = createMock(MaintenanceController.class);
     defaultFilter = new SchedulingFilterImpl(storage, maintenance);
     storeProvider = createMock(StoreProvider.class);
-    taskStore = createMock(TaskStore.Mutable.class);
     attributeStore = createMock(AttributeStore.Mutable.class);
 
     // Link the store provider to the store mocks.
     expectReads();
 
-    expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
     expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
   }
 
@@ -142,19 +139,17 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   public void testMeetsOffer() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_A).times(2);
-    expectGetTasks().times(2);
 
     control.replay();
 
-    assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK));
-    assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1));
+    assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), EMPTY_JOB);
+    assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1), EMPTY_JOB);
   }
 
   @Test
   public void testSufficientPorts() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_A).times(4);
-    expectGetTasks().times(4);
 
     control.replay();
 
@@ -178,28 +173,28 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         .setRequestedPorts(ImmutableSet.of("one", "two", "three")));
 
     Set<Veto> none = ImmutableSet.of();
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID));
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID));
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID, EMPTY_JOB));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID, EMPTY_JOB));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID, EMPTY_JOB));
     assertEquals(
         ImmutableSet.of(PORTS.veto(1)),
-        defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID));
+        defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID, EMPTY_JOB));
   }
 
   @Test
   public void testInsufficientResources() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_A).times(4);
-    expectGetTasks().times(4);
 
     control.replay();
 
     assertVetoes(
         makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM + 1, DEFAULT_DISK + 1),
+        EMPTY_JOB,
         CPU.veto(1), DISK.veto(1), RAM.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), CPU.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), RAM.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), DISK.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), EMPTY_JOB, CPU.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), EMPTY_JOB, RAM.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), EMPTY_JOB, DISK.veto(1));
   }
 
   @Test
@@ -209,13 +204,13 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     control.replay();
 
-    checkConstraint(HOST_A, DEDICATED_ATTRIBUTE, true, ROLE_A);
-    assertVetoes(makeTask(OWNER_B, JOB_B), HOST_A, DEDICATED_HOST_VETO);
+    checkConstraint(EMPTY_JOB, HOST_A, DEDICATED_ATTRIBUTE, true, ROLE_A);
+    assertVetoes(makeTask(OWNER_B, JOB_B), HOST_A, EMPTY_JOB, DEDICATED_HOST_VETO);
   }
 
   @Test
   public void testSharedDedicatedHost() throws Exception {
-    String dedicated1 = "ads/adserver";
+    String dedicated1 = "userA/jobA";
     String dedicated2 = "kestrel/kestrel";
 
     expectGetHostAttributes(HOST_A, dedicated(dedicated1, dedicated2)).anyTimes();
@@ -224,19 +219,23 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     control.replay();
 
     assertNoVetoes(checkConstraint(
-        new Identity().setRole("ads"),
-        "adserver",
+        new Identity().setRole("userA"),
+        "jobA",
+        EMPTY_JOB,
         HOST_A,
         DEDICATED_ATTRIBUTE,
         true,
-        dedicated1));
+        dedicated1),
+        EMPTY_JOB);
     assertNoVetoes(checkConstraint(
         new Identity().setRole("kestrel"),
         "kestrel",
+        EMPTY_JOB,
         HOST_A,
         DEDICATED_ATTRIBUTE,
         true,
-        dedicated2));
+        dedicated2),
+        EMPTY_JOB);
   }
 
   @Test
@@ -248,44 +247,41 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     control.replay();
 
-    checkConstraint(HOST_A, "jvm", true, "1.0");
-    checkConstraint(HOST_A, "jvm", false, "4.0");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.0");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", false, "4.0");
 
-    checkConstraint(HOST_A, "jvm", true, "1.0", "2.0");
-    checkConstraint(HOST_B, "jvm", false, "2.0", "3.0");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.0", "2.0");
+    checkConstraint(EMPTY_JOB, HOST_B, "jvm", false, "2.0", "3.0");
   }
 
   @Test
   public void testHostScheduledForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetTasks();
     expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.SCHEDULED);
 
     control.replay();
 
-    assertNoVetoes(makeTask(), HOST_A);
+    assertNoVetoes(makeTask(), HOST_A, EMPTY_JOB);
   }
 
   @Test
   public void testHostDrainingForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetTasks();
     expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINING);
 
     control.replay();
 
-    assertVetoes(makeTask(), ConstraintFilter.maintenanceVeto("draining"));
+    assertVetoes(makeTask(), EMPTY_JOB, ConstraintFilter.maintenanceVeto("draining"));
   }
 
   @Test
   public void testHostDrainedForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetTasks();
     expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINED);
 
     control.replay();
 
-    assertVetoes(makeTask(), ConstraintFilter.maintenanceVeto("drained"));
+    assertVetoes(makeTask(), EMPTY_JOB, ConstraintFilter.maintenanceVeto("drained"));
   }
 
   @Test
@@ -300,9 +296,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     Constraint constraint1 = makeConstraint("host", HOST_A);
     Constraint constraint2 = makeConstraint(DEDICATED_ATTRIBUTE, "xxx");
 
-    assertVetoes(makeTask(OWNER_A, JOB_A, constraint1, constraint2), HOST_A,
+    assertVetoes(
+        makeTask(OWNER_A, JOB_A, constraint1, constraint2),
+        HOST_A,
+        EMPTY_JOB,
         mismatchVeto(DEDICATED_ATTRIBUTE));
-    assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B);
+    assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B, EMPTY_JOB);
   }
 
   @Test
@@ -321,10 +320,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertVetoes(
         makeTask(OWNER_A, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
         HOST_A,
+        EMPTY_JOB,
         mismatchVeto(DEDICATED_ATTRIBUTE));
     assertVetoes(
         makeTask(OWNER_B, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
         HOST_B,
+        EMPTY_JOB,
         mismatchVeto(DEDICATED_ATTRIBUTE));
   }
 
@@ -332,12 +333,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   public void testUnderLimitNoTasks() throws Exception {
     expectGetHostAttributes(HOST_A);
     expectGetHostAttributes(HOST_A, host(HOST_A));
-    expectGetTasks();
     expectGetHostMaintenanceStatus(HOST_A);
 
     control.replay();
 
-    assertNoVetoes(hostLimitTask(2), HOST_A);
+    assertNoVetoes(hostLimitTask(2), HOST_A, EMPTY_JOB);
   }
 
   private Attribute host(String host) {
@@ -361,31 +361,30 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     expectGetHostMaintenanceStatus(HOST_B).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_C).atLeastOnce();
 
-    expectGetTasks(
+    CachedJobState jobState = new CachedJobState(ImmutableSet.of(
         makeScheduledTask(OWNER_A, JOB_A, HOST_A),
         makeScheduledTask(OWNER_B, JOB_A, HOST_A),
         makeScheduledTask(OWNER_B, JOB_A, HOST_A),
         makeScheduledTask(OWNER_A, JOB_A, HOST_B),
         makeScheduledTask(OWNER_A, JOB_A, HOST_B),
         makeScheduledTask(OWNER_B, JOB_A, HOST_B),
-        makeScheduledTask(OWNER_A, JOB_A, HOST_C))
-        .atLeastOnce();
+        makeScheduledTask(OWNER_A, JOB_A, HOST_C)));
 
     control.replay();
 
-    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A);
-    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 1), HOST_B, limitVeto(HOST_ATTRIBUTE));
-    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_B, limitVeto(HOST_ATTRIBUTE));
-    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B);
+    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A, jobState);
+    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 1), HOST_B, jobState, limitVeto(HOST_ATTRIBUTE));
+    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_B, jobState, limitVeto(HOST_ATTRIBUTE));
+    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B, jobState);
 
-    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_B, limitVeto(RACK_ATTRIBUTE));
-    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 3), HOST_B, limitVeto(RACK_ATTRIBUTE));
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B);
+    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_B, jobState, limitVeto(RACK_ATTRIBUTE));
+    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 3), HOST_B, jobState, limitVeto(RACK_ATTRIBUTE));
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B, jobState);
 
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C);
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C, jobState);
 
-    assertVetoes(rackLimitTask(OWNER_A, JOB_A, 1), HOST_C, limitVeto(RACK_ATTRIBUTE));
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C);
+    assertVetoes(rackLimitTask(OWNER_A, JOB_A, 1), HOST_C, jobState, limitVeto(RACK_ATTRIBUTE));
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C, jobState);
   }
 
   @Test
@@ -396,19 +395,19 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     control.replay();
 
     // Matches attribute, matching value.
-    checkConstraint(HOST_A, "jvm", true, "1.0");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.0");
 
     // Matches attribute, different value.
-    checkConstraint(HOST_A, "jvm", false, "1.4");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", false, "1.4");
 
     // Does not match attribute.
-    checkConstraint(HOST_A, "xxx", false, "1.4");
+    checkConstraint(EMPTY_JOB, HOST_A, "xxx", false, "1.4");
 
     // Logical 'OR' matching attribute.
-    checkConstraint(HOST_A, "jvm", false, "1.2", "1.4");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", false, "1.2", "1.4");
 
     // Logical 'OR' not matching attribute.
-    checkConstraint(HOST_A, "xxx", false, "1.0", "1.4");
+    checkConstraint(EMPTY_JOB, HOST_A, "xxx", false, "1.0", "1.4");
   }
 
   @Test
@@ -421,32 +420,35 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     control.replay();
 
     // Matches attribute, matching value.
-    checkConstraint(HOST_A, "jvm", true, "1.4");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.4");
 
     // Matches attribute, different value.
-    checkConstraint(HOST_A, "jvm", false, "1.0");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", false, "1.0");
 
     // Does not match attribute.
-    checkConstraint(HOST_A, "xxx", false, "1.4");
+    checkConstraint(EMPTY_JOB, HOST_A, "xxx", false, "1.4");
 
     // Logical 'OR' with attribute and value match.
-    checkConstraint(HOST_A, "jvm", true, "1.2", "1.4");
+    checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.2", "1.4");
 
     // Does not match attribute.
-    checkConstraint(HOST_A, "xxx", false, "1.0", "1.4");
+    checkConstraint(EMPTY_JOB, HOST_A, "xxx", false, "1.0", "1.4");
 
     // Check that logical AND works.
     Constraint jvmConstraint = makeConstraint("jvm", "1.6");
     Constraint zoneConstraint = makeConstraint("zone", "c");
 
     ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
-    assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID).isEmpty());
+    assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID, EMPTY_JOB).isEmpty());
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
     Constraint zoneNegated = jvmConstraint.deepCopy();
     zoneNegated.getConstraint().getValue().setNegated(true);
-    assertVetoes(makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated), HOST_A,
+    assertVetoes(
+        makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated),
+        HOST_A,
+        EMPTY_JOB,
         mismatchVeto("jvm"));
   }
 
@@ -461,25 +463,35 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   private ITaskConfig checkConstraint(
+      CachedJobState cachedJobState,
       String host,
       String constraintName,
       boolean expected,
       String value,
       String... vs) {
 
-    return checkConstraint(OWNER_A, JOB_A, host, constraintName, expected, value, vs);
+    return checkConstraint(
+        OWNER_A,
+        JOB_A,
+        cachedJobState,
+        host,
+        constraintName,
+        expected,
+        value,
+        vs);
   }
 
   private ITaskConfig checkConstraint(
       Identity owner,
       String jobName,
+      CachedJobState cachedJobState,
       String host,
       String constraintName,
       boolean expected,
       String value,
       String... vs) {
 
-    return checkConstraint(owner, jobName, host, constraintName, expected,
+    return checkConstraint(owner, jobName, cachedJobState, host, constraintName, expected,
         new ValueConstraint(false,
             ImmutableSet.<String>builder().add(value).addAll(Arrays.asList(vs)).build()));
   }
@@ -487,6 +499,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private ITaskConfig checkConstraint(
       Identity owner,
       String jobName,
+      CachedJobState cachedJobState,
       String host,
       String constraintName,
       boolean expected,
@@ -496,32 +509,38 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     ITaskConfig task = makeTask(owner, jobName, constraint);
     assertEquals(
         expected,
-        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID).isEmpty());
+        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, cachedJobState).isEmpty());
 
     Constraint negated = constraint.deepCopy();
     negated.getConstraint().getValue().setNegated(!value.isNegated());
     ITaskConfig negatedTask = makeTask(owner, jobName, negated);
     assertEquals(
         !expected,
-        defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID).isEmpty());
+        defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID, cachedJobState).isEmpty());
     return task;
   }
 
-  private void assertNoVetoes(ITaskConfig task) {
-    assertNoVetoes(task, HOST_A);
+  private void assertNoVetoes(ITaskConfig task, CachedJobState jobState) {
+    assertNoVetoes(task, HOST_A, jobState);
   }
 
-  private void assertNoVetoes(ITaskConfig task, String host) {
-    assertVetoes(task, host);
+  private void assertNoVetoes(ITaskConfig task, String host, CachedJobState jobState) {
+    assertVetoes(task, host, jobState);
   }
 
-  private void assertVetoes(ITaskConfig task, Veto... vetos) {
-    assertVetoes(task, HOST_A, vetos);
+  private void assertVetoes(ITaskConfig task, CachedJobState jobState, Veto... vetos) {
+    assertVetoes(task, HOST_A, jobState, vetos);
   }
 
-  private void assertVetoes(ITaskConfig task, String host, Veto... vetoes) {
-    assertEquals(ImmutableSet.copyOf(vetoes),
-        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID));
+  private void assertVetoes(
+      ITaskConfig task,
+      String host,
+      CachedJobState jobState,
+      Veto... vetoes) {
+
+    assertEquals(
+        ImmutableSet.copyOf(vetoes),
+        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, jobState));
   }
 
   private Attribute valueAttribute(String name, String string, String... strings) {
@@ -534,13 +553,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         TaskConstraint.value(new ValueConstraint(false, ImmutableSet.copyOf(values))));
   }
 
-  private IExpectationSetters<ImmutableSet<IScheduledTask>> expectGetTasks(
-      IScheduledTask... tasks) {
-
-    return expect(taskStore.fetchTasks((Query.Builder) anyObject()))
-        .andReturn(ImmutableSet.copyOf(tasks));
-  }
-
   private IExpectationSetters<MaintenanceMode> expectGetHostMaintenanceStatus(String host) {
     return expectGetHostMaintenanceStatus(host, MaintenanceMode.NONE);
   }