You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/14 22:52:26 UTC

git commit: Begin cleanup of StateManager interface by removing state change via Query.

Updated Branches:
  refs/heads/master 730cff698 -> a6ab7fdb7


Begin cleanup of StateManager interface by removing state change via Query.

State changes via a broad query is a rarely-used feature, and probably usually
not a good idea.  The query-based API was useful because it sometimes saves
code at the call site, but the big downside is that it makes it more difficult
for StateManager to perform course-correcting operations.
The test case added to StateManagerImplTest illustrates a now-fixed regression
that could contribute to state drift between the scheduler and the outside
world.

Bugs closed: AURORA-27

Reviewed at https://reviews.apache.org/r/16767/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/a6ab7fdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/a6ab7fdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/a6ab7fdb

Branch: refs/heads/master
Commit: a6ab7fdb7b6b4dfe724f47a58633526a87f34e8f
Parents: 730cff6
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 14 13:52:17 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 14 13:52:17 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/UserTaskLauncher.java      |  4 +-
 .../aurora/scheduler/async/TaskScheduler.java   |  8 +--
 .../aurora/scheduler/async/TaskThrottler.java   |  4 +-
 .../aurora/scheduler/async/TaskTimeout.java     |  9 ++-
 .../scheduler/state/MaintenanceController.java  | 21 +++---
 .../aurora/scheduler/state/SchedulerCore.java   |  4 +-
 .../scheduler/state/SchedulerCoreImpl.java      | 60 +++++++++++-----
 .../aurora/scheduler/state/StateManager.java    | 32 ++++++---
 .../scheduler/state/StateManagerImpl.java       | 76 ++++++--------------
 .../thrift/SchedulerThriftInterface.java        |  3 +-
 .../aurora/scheduler/UserTaskLauncherTest.java  | 19 +++--
 .../scheduler/async/TaskSchedulerTest.java      |  5 +-
 .../scheduler/async/TaskThrottlerTest.java      |  6 +-
 .../aurora/scheduler/async/TaskTimeoutTest.java | 26 +++++--
 .../state/BaseSchedulerCoreImplTest.java        |  4 +-
 .../state/MaintenanceControllerImplTest.java    |  7 +-
 .../scheduler/state/StateManagerImplTest.java   | 31 +++++---
 .../thrift/SchedulerThriftInterfaceTest.java    |  2 +-
 18 files changed, 181 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 7e180ac..010776e 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,7 +27,6 @@ import com.google.common.base.Optional;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.mesos.Protos.Offer;
@@ -85,7 +84,8 @@ class UserTaskLauncher implements TaskLauncher {
       }
 
       stateManager.changeState(
-          Query.taskScoped(status.getTaskId().getValue()),
+          status.getTaskId().getValue(),
+          Optional.<ScheduleStatus>absent(),
           translatedState,
           Optional.fromNullable(message));
     } catch (SchedulerException e) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 96c76ba..4afc332 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -161,9 +161,9 @@ interface TaskScheduler extends EventSubscriber {
         return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
           @Override public TaskSchedulerResult apply(MutableStoreProvider store) {
             LOG.fine("Attempting to schedule task " + taskId);
-            Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
-            final IScheduledTask task =
-                Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery), null);
+            final IScheduledTask task = Iterables.getOnlyElement(
+                store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+                null);
             if (task == null) {
               LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
             } else {
@@ -182,7 +182,7 @@ interface TaskScheduler extends EventSubscriber {
                 // It is in the LOST state and a new task will move to PENDING to replace it.
                 // Should the state change fail due to storage issues, that's okay.  The task will
                 // time out in the ASSIGNED state and be moved to LOST.
-                stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
+                stateManager.changeState(taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
               }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
index 11a01ea..fd6c188 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
@@ -25,7 +25,6 @@ import com.google.common.eventbus.Subscribe;
 import com.twitter.common.stats.SlidingStats;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -75,7 +74,8 @@ class TaskThrottler implements EventSubscriber {
           new Runnable() {
             @Override public void run() {
               stateManager.changeState(
-                  Query.taskScoped(stateChange.getTaskId()).byStatus(THROTTLED),
+                  stateChange.getTaskId(),
+                  Optional.of(THROTTLED),
                   PENDING,
                   Optional.<String>absent());
             }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 9f14a99..64a1941 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -42,7 +42,6 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -186,9 +185,13 @@ class TaskTimeout implements EventSubscriber {
         // timeout is still valid.  Ideally, the future would have already been canceled, but in the
         // event of a state transition race, including transientState prevents an unintended
         // task timeout.
-        Query.Builder query = Query.taskScoped(key.taskId).byStatus(key.status);
         // Note: This requires LOST transitions trigger Driver.killTask.
-        if (stateManager.changeState(query, ScheduleStatus.LOST, TIMEOUT_MESSAGE) > 0) {
+        if (stateManager.changeState(
+            key.taskId,
+            Optional.of(key.status),
+            ScheduleStatus.LOST,
+            TIMEOUT_MESSAGE)) {
+
           timedOutTasks.incrementAndGet();
         } else {
           LOG.warning("Task " + key + " does not exist, or was not in the expected state.");

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 3dd2271..af10d44 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -27,7 +27,6 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.eventbus.Subscribe;
-import com.twitter.common.base.Closure;
 
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.HostStatus;
@@ -119,11 +118,7 @@ public interface MaintenanceController {
       this.eventSink = checkNotNull(eventSink);
     }
 
-    private Set<HostStatus> watchDrainingTasks(
-        MutableStoreProvider store,
-        Set<String> hosts,
-        Closure<Query.Builder> callback) {
-
+    private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
       Set<String> emptyHosts = Sets.newHashSet();
       for (String host : hosts) {
         // If there are no tasks on the host, immediately transition to DRAINED.
@@ -134,7 +129,13 @@ public interface MaintenanceController {
         if (activeTasks.isEmpty()) {
           emptyHosts.add(host);
         } else {
-          callback.execute(query);
+          for (String taskId : activeTasks) {
+            stateManager.changeState(
+                taskId,
+                Optional.<ScheduleStatus>absent(),
+                ScheduleStatus.RESTARTING,
+                DRAINING_MESSAGE);
+          }
         }
       }
 
@@ -186,11 +187,7 @@ public interface MaintenanceController {
     public Set<HostStatus> drain(final Set<String> hosts) {
       return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
         @Override public Set<HostStatus> apply(MutableStoreProvider store) {
-          return watchDrainingTasks(store, hosts, new Closure<Query.Builder>() {
-            @Override public void execute(Query.Builder query) {
-              stateManager.changeState(query, ScheduleStatus.RESTARTING, DRAINING_MESSAGE);
-            }
-          });
+          return watchDrainingTasks(store, hosts);
         }
       });
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index b89d990..7a88629 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -83,11 +83,11 @@ public interface SchedulerCore {
   /**
    * Assigns a new state to tasks.
    *
-   * @param query Builder for a query to identify tasks
+   * @param taskId ID of the task to transition.
    * @param status The new state of the tasks.
    * @param message Additional information about the state transition.
    */
-  void setTaskStatus(Query.Builder query, ScheduleStatus status, Optional<String> message);
+  void setTaskStatus(String taskId, ScheduleStatus status, Optional<String> message);
 
   /**
    * Kills a specific set of tasks.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index 1d450f2..8dec283 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -75,9 +75,8 @@ class SchedulerCoreImpl implements SchedulerCore {
   // Schedulers that are responsible for triggering execution of jobs.
   private final ImmutableList<JobManager> jobManagers;
 
-  // TODO(Bill Farner): Avoid using StateManagerImpl.
   // State manager handles persistence of task modifications and state transitions.
-  private final StateManagerImpl stateManager;
+  private final StateManager stateManager;
 
   private final TaskIdGenerator taskIdGenerator;
   private final JobFilter jobFilter;
@@ -97,7 +96,7 @@ class SchedulerCoreImpl implements SchedulerCore {
       Storage storage,
       CronJobManager cronScheduler,
       ImmediateJobManager immediateScheduler,
-      StateManagerImpl stateManager,
+      StateManager stateManager,
       TaskIdGenerator taskIdGenerator,
       JobFilter jobFilter) {
 
@@ -118,7 +117,9 @@ class SchedulerCoreImpl implements SchedulerCore {
 
   @Override
   public synchronized void tasksDeleted(Set<String> taskIds) {
-    setTaskStatus(Query.taskScoped(taskIds), ScheduleStatus.UNKNOWN, Optional.<String>absent());
+    for (String taskId : taskIds) {
+      setTaskStatus(taskId, ScheduleStatus.UNKNOWN, Optional.<String>absent());
+    }
   }
 
   @Override
@@ -245,18 +246,20 @@ class SchedulerCoreImpl implements SchedulerCore {
 
   @Override
   public synchronized void setTaskStatus(
-      Query.Builder query,
+      String taskId,
       final ScheduleStatus status,
       Optional<String> message) {
 
-    checkNotNull(query);
+    checkNotNull(taskId);
     checkNotNull(status);
 
-    stateManager.changeState(query, status, message);
+    stateManager.changeState(taskId, Optional.<ScheduleStatus>absent(), status, message);
   }
 
   @Override
-  public synchronized void killTasks(Query.Builder query, String user) throws ScheduleException {
+  public synchronized void killTasks(Query.Builder query, final String user)
+      throws ScheduleException {
+
     checkNotNull(query);
     LOG.info("Killing tasks matching " + query);
 
@@ -274,10 +277,28 @@ class SchedulerCoreImpl implements SchedulerCore {
     }
 
     // Unless statuses were specifically supplied, only attempt to kill active tasks.
-    Query.Builder taskQuery = query.get().isSetStatuses() ? query.byStatus(ACTIVE_STATES) : query;
+    final Query.Builder taskQuery = query.get().isSetStatuses()
+        ? query.byStatus(ACTIVE_STATES)
+        : query;
+
+    int tasksAffected = storage.write(new MutateWork.Quiet<Integer>() {
+      @Override public Integer apply(MutableStoreProvider storeProvider) {
+        int total = 0;
+        for (String taskId : Tasks.ids(storeProvider.getTaskStore().fetchTasks(taskQuery))) {
+          boolean changed = stateManager.changeState(
+              taskId,
+              Optional.<ScheduleStatus>absent(),
+              KILLING,
+              Optional.of("Killed by " + user));
+
+          if (changed) {
+            total++;
+          }
+        }
+        return total;
+      }
+    });
 
-    int tasksAffected =
-        stateManager.changeState(taskQuery, KILLING, Optional.of("Killed by " + user));
     if (!jobDeleted && (tasksAffected == 0)) {
       throw new ScheduleException("No jobs to kill");
     }
@@ -307,22 +328,27 @@ class SchedulerCoreImpl implements SchedulerCore {
           throw new ScheduleException("Not all requested shards are active.");
         }
         LOG.info("Restarting shards matching " + query);
-        stateManager.changeState(
-            Query.taskScoped(Tasks.ids(matchingTasks)),
-            RESTARTING,
-            Optional.of("Restarted by " + requestingUser));
+        for (String taskId : Tasks.ids(matchingTasks)) {
+          stateManager.changeState(
+              taskId,
+              Optional.<ScheduleStatus>absent(),
+              RESTARTING,
+              Optional.of("Restarted by " + requestingUser));
+        }
       }
     });
   }
 
-
   @Override
   public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
     checkNotNull(task);
     checkNotNull(preemptingTask);
     // TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
 
-    stateManager.changeState(Query.taskScoped(task.getTaskId()), ScheduleStatus.PREEMPTING,
+    stateManager.changeState(
+        task.getTaskId(),
+        Optional.<ScheduleStatus>absent(),
+        ScheduleStatus.PREEMPTING,
         Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 045165d..4249707 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -21,27 +21,39 @@ import java.util.Set;
 import com.google.common.base.Optional;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
 
 /**
- * Thin interface for the state manager.
+ * A manager for the state of tasks.  Most modifications to tasks should be made here, especially
+ * those that alter the {@link ScheduleStatus} of tasks.
  */
 public interface StateManager {
 
   /**
-   * Performs a simple state change, transitioning all tasks matching a query to the given
-   * state and applying the given audit message.
-   * TODO(William Farner): Consider removing the return value.
+   * Attempts to alter a task from its existing state to {@code newState}. If a {@code casState}
+   * (compare and swap) is provided, the transition will only performed if the task is currently
+   * in the state.
+   *
+   * @param taskId ID of the task to transition.
+   * @param casState State that the task must be in for the operation to proceed.  If the task
+   *                 is found to not be in {@code casState}, no action is performed and
+   *                 {@code false} is returned.  This can be useful when deferring asynchronous
+   *                 work, to perform a follow-up action iff the task has not changed since the
+   *                 decision to defer the action was mde.
+   * @param newState State to move the task to.
+   * @param auditMessage Message to include with the transition.
+   * @return {@code true} if the transition was performed and the task was moved to
+   *         {@code newState}, {@code false} if the transition was not allowed, or the task was not
+   *         in {@code casState}.
    *
-   * @param query Builder of the query to perform, the results of which will be modified.
-   * @param newState State to move the resulting tasks into.
-   * @param auditMessage Audit message to apply along with the state change.
-   * @return the number of successful state changes.
    */
-  int changeState(Query.Builder query, ScheduleStatus newState, Optional<String> auditMessage);
+  boolean changeState(
+      String taskId,
+      Optional<ScheduleStatus> casState,
+      ScheduleStatus newState,
+      Optional<String> auditMessage);
 
   /**
    * Assigns a task to a specific slave.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 256f830..2b8ca09 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -31,8 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -54,8 +52,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -78,8 +74,9 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
  * Manager of all persistence-related operations for the scheduler.  Acts as a controller for
  * persisted state machine transitions, and their side-effects.
  *
- * TODO(William Farner): Re-evaluate thread safety here, specifically risk of races that
- * modify managerState.
+ * TODO(wfarner): This class is due for an overhaul.  There are several aspects of it that could
+ * probably be made much simpler.  Specifically, the workQueue is particularly difficult to reason
+ * about.
  */
 public class StateManagerImpl implements StateManager {
   private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
@@ -199,13 +196,15 @@ public class StateManagerImpl implements StateManager {
   }
 
   @Override
-  public int changeState(
-      Query.Builder query,
+  public boolean changeState(
+      String taskId,
+      Optional<ScheduleStatus> casState,
       final ScheduleStatus newState,
       final Optional<String> auditMessage) {
 
-    return changeState(query, new Function<TaskStateMachine, Boolean>() {
-      @Override public Boolean apply(TaskStateMachine stateMachine) {
+    return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
+      @Override
+      public Boolean apply(TaskStateMachine stateMachine) {
         return stateMachine.updateState(newState, auditMessage);
       }
     });
@@ -223,34 +222,26 @@ public class StateManagerImpl implements StateManager {
     checkNotNull(assignedPorts);
 
     TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
-    changeState(Query.taskScoped(taskId), mutation);
+    changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
 
     return mutation.getAssignedTask();
   }
 
-  private int changeStateInWriteOperation(
-      Set<String> taskIds,
-      Function<TaskStateMachine, Boolean> stateChange) {
-
-    int count = 0;
-    for (TaskStateMachine stateMachine : getStateMachines(taskIds).values()) {
-      if (stateChange.apply(stateMachine)) {
-        ++count;
-      }
-    }
-    return count;
-  }
-
-  private int changeState(
-      final Query.Builder query,
+  private boolean changeState(
+      final String taskId,
+      final Optional<ScheduleStatus> casState,
       final Function<TaskStateMachine, Boolean> stateChange) {
 
-    return storage.write(storage.new QuietSideEffectWork<Integer>() {
-      @Override public Integer apply(MutableStoreProvider storeProvider) {
-        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
-            .transform(Tasks.SCHEDULED_TO_ID)
-            .toSet();
-        return changeStateInWriteOperation(ids, stateChange);
+    return storage.write(storage.new QuietSideEffectWork<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        IScheduledTask task = Iterables.getOnlyElement(
+            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+            null);
+        if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
+          return false;
+        }
+
+        return stateChange.apply(getStateMachine(taskId, task));
       }
     });
   }
@@ -416,27 +407,6 @@ public class StateManagerImpl implements StateManager {
     });
   }
 
-  private Map<String, TaskStateMachine> getStateMachines(final Set<String> taskIds) {
-    return storage.consistentRead(new Work.Quiet<Map<String, TaskStateMachine>>() {
-      @Override public Map<String, TaskStateMachine> apply(StoreProvider storeProvider) {
-        Map<String, IScheduledTask> existingTasks = Maps.uniqueIndex(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
-            new Function<IScheduledTask, String>() {
-              @Override public String apply(IScheduledTask input) {
-                return input.getAssignedTask().getTaskId();
-              }
-            });
-
-        ImmutableMap.Builder<String, TaskStateMachine> builder = ImmutableMap.builder();
-        for (String taskId : taskIds) {
-          // Pass null get() values through.
-          builder.put(taskId, getStateMachine(taskId, existingTasks.get(taskId)));
-        }
-        return builder.build();
-      }
-    });
-  }
-
   private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
     if (task != null) {
       return createStateMachine(task, task.getStatus());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index c1a11bd..76caa62 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -672,8 +672,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       return response;
     }
 
-    schedulerCore.setTaskStatus(
-        Query.taskScoped(taskId), status, transitionMessage(context.getIdentity()));
+    schedulerCore.setTaskStatus(taskId, status, transitionMessage(context.getIdentity()));
     return new Response().setResponseCode(OK).setMessage("Transition attempted.");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index bba1b72..ffc37db 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Iterables;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.OfferQueue;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
@@ -88,9 +88,12 @@ public class UserTaskLauncherTest extends EasyMockTest {
 
   @Test
   public void testForwardsStatusUpdates() throws Exception {
-    expect(
-        stateManager.changeState(Query.taskScoped(TASK_ID_A), RUNNING, Optional.of("fake message")))
-        .andReturn(1);
+    expect(stateManager.changeState(
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
+        RUNNING,
+        Optional.of("fake message")))
+        .andReturn(true);
 
     control.replay();
 
@@ -114,7 +117,8 @@ public class UserTaskLauncherTest extends EasyMockTest {
   @Test(expected = StorageException.class)
   public void testFailedStatusUpdate() throws Exception {
     expect(stateManager.changeState(
-        Query.taskScoped(TASK_ID_A),
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
         RUNNING,
         Optional.of("fake message")))
         .andThrow(new StorageException("Injected error"));
@@ -132,10 +136,11 @@ public class UserTaskLauncherTest extends EasyMockTest {
   @Test
   public void testMemoryLimitTranslationHack() throws Exception {
     expect(stateManager.changeState(
-        Query.taskScoped(TASK_ID_A),
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
         FAILED,
         Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY)))
-        .andReturn(0);
+        .andReturn(false);
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 4dfac03..6c124c5 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -317,10 +317,11 @@ public class TaskSchedulerTest extends EasyMockTest {
     driver.launchTask(OFFER_A.getId(), mesosTask);
     expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
     expect(stateManager.changeState(
-        Query.taskScoped("a").byStatus(PENDING),
+        "a",
+        Optional.of(PENDING),
         LOST,
         TaskSchedulerImpl.LAUNCH_FAILED_MSG))
-        .andReturn(1);
+        .andReturn(true);
 
     replayAndCreateScheduler();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
index 66bc2a0..a539b59 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
@@ -29,7 +29,6 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -119,10 +118,11 @@ public class TaskThrottlerTest extends EasyMockTest {
 
   private void expectMovedToPending(IScheduledTask task) {
     expect(stateManager.changeState(
-        Query.taskScoped(Tasks.id(task)).byStatus(THROTTLED),
+        Tasks.id(task),
+        Optional.of(THROTTLED),
         PENDING,
         Optional.<String>absent()))
-        .andReturn(1);
+        .andReturn(true);
   }
 
   private IScheduledTask makeTask(String id, ScheduleStatus status) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 023905b..d4685f6 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
@@ -35,7 +36,6 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -175,8 +175,12 @@ public class TaskTimeoutTest extends EasyMockTest {
     expectTaskWatch();
     expectCancel();
     Capture<Runnable> killingTimeout = expectTaskWatch();
-    Query.Builder query = Query.taskScoped(TASK_ID).byStatus(KILLING);
-    expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(1);
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.of(KILLING),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(true);
 
     replayAndCreate();
 
@@ -188,8 +192,12 @@ public class TaskTimeoutTest extends EasyMockTest {
   @Test
   public void testTimeout() throws Exception {
     Capture<Runnable> assignedTimeout = expectTaskWatch();
-    Query.Builder query = Query.taskScoped(TASK_ID).byStatus(ASSIGNED);
-    expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(1);
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.of(ASSIGNED),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(true);
 
     replayAndCreate();
 
@@ -202,8 +210,12 @@ public class TaskTimeoutTest extends EasyMockTest {
   @Test
   public void testTaskDeleted() throws Exception {
     Capture<Runnable> assignedTimeout = expectTaskWatch();
-    Query.Builder query = Query.taskScoped(TASK_ID).byStatus(KILLING);
-    expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(0);
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.of(KILLING),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(false);
 
     replayAndCreate();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index cc929f8..4eeed38 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -1453,7 +1453,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
       ScheduleStatus status,
       Optional<String> message) {
 
-    scheduler.setTaskStatus(query, status, message);
+    for (String taskId : Tasks.ids(Storage.Util.consistentFetchTasks(storage, query))) {
+      scheduler.setTaskStatus(taskId, status, message);
+    }
   }
 
   public void changeStatus(Query.Builder query, ScheduleStatus status, ScheduleStatus... statuses) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index c0addd6..f7f7632 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -33,6 +33,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -55,7 +56,6 @@ import static org.junit.Assert.assertEquals;
 public class MaintenanceControllerImplTest extends EasyMockTest {
 
   private static final String HOST_A = "a";
-  private static final String HOST_B = "b";
   private static final Set<String> A = ImmutableSet.of(HOST_A);
 
   private StorageTestUtil storageUtil;
@@ -102,10 +102,11 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
     expectMaintenanceModeChange(HOST_A, SCHEDULED);
     expectFetchTasksByHost(HOST_A, ImmutableSet.<ScheduledTask>of(task));
     expect(stateManager.changeState(
-        Query.slaveScoped(HOST_A).active(),
+        Tasks.id(task),
+        Optional.<ScheduleStatus>absent(),
         ScheduleStatus.RESTARTING,
         MaintenanceControllerImpl.DRAINING_MESSAGE))
-        .andReturn(1);
+        .andReturn(true);
     expectMaintenanceModeChange(HOST_A, DRAINING);
     expect(storageUtil.attributeStore.getHostAttributes(HOST_A))
         .andReturn(Optional.of(new HostAttributes().setHost(HOST_A).setMode(DRAINING)));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 74ec74f..5379300 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -202,8 +202,8 @@ public class StateManagerImplTest extends EasyMockTest {
     control.replay();
 
     insertTask(task, 0);
-    assertEquals(1, changeState(taskId, KILLING));
-    assertEquals(0, changeState(taskId, KILLING));
+    assertEquals(true, changeState(taskId, KILLING));
+    assertEquals(false, changeState(taskId, KILLING));
   }
 
   @Test
@@ -228,17 +228,15 @@ public class StateManagerImplTest extends EasyMockTest {
 
   @Test
   public void testNestedEvents() {
-    String id = "a";
+    final String id = "a";
     ITaskConfig task = makeTask(JIM, MY_JOB);
     expect(taskIdGenerator.generate(task, 0)).andReturn(id);
 
     // Trigger an event that produces a side-effect and a PubSub event .
     eventSink.post(matchStateChange(id, INIT, PENDING));
     expectLastCall().andAnswer(new IAnswer<Void>() {
-      @Override
-      public Void answer() throws Throwable {
-        stateManager.changeState(
-            Query.unscoped(), ScheduleStatus.ASSIGNED, Optional.<String>absent());
+      @Override public Void answer() throws Throwable {
+        changeState(id, ASSIGNED);
         return null;
       }
     });
@@ -285,6 +283,17 @@ public class StateManagerImplTest extends EasyMockTest {
     changeState(taskId, FAILED);
   }
 
+  @Test
+  public void testKillUnknownTask() {
+    String unknownTask = "unknown";
+
+    driver.killTask(unknownTask);
+
+    control.replay();
+
+    changeState(unknownTask, RUNNING);
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,
@@ -311,8 +320,12 @@ public class StateManagerImplTest extends EasyMockTest {
     stateManager.insertPendingTasks(ImmutableMap.of(instanceId, task));
   }
 
-  private int changeState(String taskId, ScheduleStatus status) {
-    return stateManager.changeState(Query.taskScoped(taskId), status, Optional.<String>absent());
+  private boolean changeState(String taskId, ScheduleStatus status) {
+    return stateManager.changeState(
+        taskId,
+        Optional.<ScheduleStatus>absent(),
+        status,
+        Optional.<String>absent());
   }
 
   private static ITaskConfig makeTask(Identity owner, String job) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 91c1c24..b46f29a 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -474,7 +474,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     String taskId = "task_id_foo";
     ScheduleStatus status = ScheduleStatus.FAILED;
 
-    scheduler.setTaskStatus(Query.taskScoped(taskId), status, transitionMessage(USER));
+    scheduler.setTaskStatus(taskId, status, transitionMessage(USER));
     // Expect auth is first called by an interceptor and then by SchedulerThriftInterface to extract
     // the SessionContext.
     // Note: This will change after AOP-style session validation passes in a SessionContext.