You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/13 21:33:41 UTC

[1/3] git commit: Begin cleanup of StateManager interface by removing state change via Query.

Updated Branches:
  refs/heads/wfarner/state_machine_refactor [created] 70adc19ad


Begin cleanup of StateManager interface by removing state change via Query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/801dcfbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/801dcfbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/801dcfbd

Branch: refs/heads/wfarner/state_machine_refactor
Commit: 801dcfbd55e3edfd47e1a504d4fd5e34fc3c2eb6
Parents: 7e87588
Author: Bill Farner <bi...@twitter.com>
Authored: Wed Jan 8 16:18:48 2014 -0800
Committer: Bill Farner <bi...@twitter.com>
Committed: Mon Jan 13 12:22:05 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/UserTaskLauncher.java      |  4 +-
 .../aurora/scheduler/async/TaskScheduler.java   |  8 +--
 .../aurora/scheduler/async/TaskTimeout.java     |  9 ++-
 .../scheduler/state/MaintenanceController.java  | 21 +++---
 .../aurora/scheduler/state/SchedulerCore.java   |  4 +-
 .../scheduler/state/SchedulerCoreImpl.java      | 60 +++++++++++-----
 .../aurora/scheduler/state/StateManager.java    | 29 +++++---
 .../scheduler/state/StateManagerImpl.java       | 75 ++++++--------------
 .../thrift/SchedulerThriftInterface.java        |  3 +-
 .../aurora/scheduler/UserTaskLauncherTest.java  | 19 +++--
 .../scheduler/async/TaskSchedulerTest.java      |  5 +-
 .../aurora/scheduler/async/TaskTimeoutTest.java | 26 +++++--
 .../state/BaseSchedulerCoreImplTest.java        |  4 +-
 .../state/MaintenanceControllerImplTest.java    |  7 +-
 .../scheduler/state/StateManagerImplTest.java   | 32 ++++++---
 .../thrift/SchedulerThriftInterfaceTest.java    |  2 +-
 16 files changed, 174 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 7e180ac..010776e 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,7 +27,6 @@ import com.google.common.base.Optional;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.mesos.Protos.Offer;
@@ -85,7 +84,8 @@ class UserTaskLauncher implements TaskLauncher {
       }
 
       stateManager.changeState(
-          Query.taskScoped(status.getTaskId().getValue()),
+          status.getTaskId().getValue(),
+          Optional.<ScheduleStatus>absent(),
           translatedState,
           Optional.fromNullable(message));
     } catch (SchedulerException e) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 96c76ba..4afc332 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -161,9 +161,9 @@ interface TaskScheduler extends EventSubscriber {
         return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
           @Override public TaskSchedulerResult apply(MutableStoreProvider store) {
             LOG.fine("Attempting to schedule task " + taskId);
-            Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
-            final IScheduledTask task =
-                Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery), null);
+            final IScheduledTask task = Iterables.getOnlyElement(
+                store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+                null);
             if (task == null) {
               LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
             } else {
@@ -182,7 +182,7 @@ interface TaskScheduler extends EventSubscriber {
                 // It is in the LOST state and a new task will move to PENDING to replace it.
                 // Should the state change fail due to storage issues, that's okay.  The task will
                 // time out in the ASSIGNED state and be moved to LOST.
-                stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
+                stateManager.changeState(taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
               }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 9f14a99..64a1941 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -42,7 +42,6 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -186,9 +185,13 @@ class TaskTimeout implements EventSubscriber {
         // timeout is still valid.  Ideally, the future would have already been canceled, but in the
         // event of a state transition race, including transientState prevents an unintended
         // task timeout.
-        Query.Builder query = Query.taskScoped(key.taskId).byStatus(key.status);
         // Note: This requires LOST transitions trigger Driver.killTask.
-        if (stateManager.changeState(query, ScheduleStatus.LOST, TIMEOUT_MESSAGE) > 0) {
+        if (stateManager.changeState(
+            key.taskId,
+            Optional.of(key.status),
+            ScheduleStatus.LOST,
+            TIMEOUT_MESSAGE)) {
+
           timedOutTasks.incrementAndGet();
         } else {
           LOG.warning("Task " + key + " does not exist, or was not in the expected state.");

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 3dd2271..af10d44 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -27,7 +27,6 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.eventbus.Subscribe;
-import com.twitter.common.base.Closure;
 
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.HostStatus;
@@ -119,11 +118,7 @@ public interface MaintenanceController {
       this.eventSink = checkNotNull(eventSink);
     }
 
-    private Set<HostStatus> watchDrainingTasks(
-        MutableStoreProvider store,
-        Set<String> hosts,
-        Closure<Query.Builder> callback) {
-
+    private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
       Set<String> emptyHosts = Sets.newHashSet();
       for (String host : hosts) {
         // If there are no tasks on the host, immediately transition to DRAINED.
@@ -134,7 +129,13 @@ public interface MaintenanceController {
         if (activeTasks.isEmpty()) {
           emptyHosts.add(host);
         } else {
-          callback.execute(query);
+          for (String taskId : activeTasks) {
+            stateManager.changeState(
+                taskId,
+                Optional.<ScheduleStatus>absent(),
+                ScheduleStatus.RESTARTING,
+                DRAINING_MESSAGE);
+          }
         }
       }
 
@@ -186,11 +187,7 @@ public interface MaintenanceController {
     public Set<HostStatus> drain(final Set<String> hosts) {
       return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
         @Override public Set<HostStatus> apply(MutableStoreProvider store) {
-          return watchDrainingTasks(store, hosts, new Closure<Query.Builder>() {
-            @Override public void execute(Query.Builder query) {
-              stateManager.changeState(query, ScheduleStatus.RESTARTING, DRAINING_MESSAGE);
-            }
-          });
+          return watchDrainingTasks(store, hosts);
         }
       });
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index b89d990..7a88629 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -83,11 +83,11 @@ public interface SchedulerCore {
   /**
    * Assigns a new state to tasks.
    *
-   * @param query Builder for a query to identify tasks
+   * @param taskId ID of the task to transition.
    * @param status The new state of the tasks.
    * @param message Additional information about the state transition.
    */
-  void setTaskStatus(Query.Builder query, ScheduleStatus status, Optional<String> message);
+  void setTaskStatus(String taskId, ScheduleStatus status, Optional<String> message);
 
   /**
    * Kills a specific set of tasks.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index 1d450f2..8dec283 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -75,9 +75,8 @@ class SchedulerCoreImpl implements SchedulerCore {
   // Schedulers that are responsible for triggering execution of jobs.
   private final ImmutableList<JobManager> jobManagers;
 
-  // TODO(Bill Farner): Avoid using StateManagerImpl.
   // State manager handles persistence of task modifications and state transitions.
-  private final StateManagerImpl stateManager;
+  private final StateManager stateManager;
 
   private final TaskIdGenerator taskIdGenerator;
   private final JobFilter jobFilter;
@@ -97,7 +96,7 @@ class SchedulerCoreImpl implements SchedulerCore {
       Storage storage,
       CronJobManager cronScheduler,
       ImmediateJobManager immediateScheduler,
-      StateManagerImpl stateManager,
+      StateManager stateManager,
       TaskIdGenerator taskIdGenerator,
       JobFilter jobFilter) {
 
@@ -118,7 +117,9 @@ class SchedulerCoreImpl implements SchedulerCore {
 
   @Override
   public synchronized void tasksDeleted(Set<String> taskIds) {
-    setTaskStatus(Query.taskScoped(taskIds), ScheduleStatus.UNKNOWN, Optional.<String>absent());
+    for (String taskId : taskIds) {
+      setTaskStatus(taskId, ScheduleStatus.UNKNOWN, Optional.<String>absent());
+    }
   }
 
   @Override
@@ -245,18 +246,20 @@ class SchedulerCoreImpl implements SchedulerCore {
 
   @Override
   public synchronized void setTaskStatus(
-      Query.Builder query,
+      String taskId,
       final ScheduleStatus status,
       Optional<String> message) {
 
-    checkNotNull(query);
+    checkNotNull(taskId);
     checkNotNull(status);
 
-    stateManager.changeState(query, status, message);
+    stateManager.changeState(taskId, Optional.<ScheduleStatus>absent(), status, message);
   }
 
   @Override
-  public synchronized void killTasks(Query.Builder query, String user) throws ScheduleException {
+  public synchronized void killTasks(Query.Builder query, final String user)
+      throws ScheduleException {
+
     checkNotNull(query);
     LOG.info("Killing tasks matching " + query);
 
@@ -274,10 +277,28 @@ class SchedulerCoreImpl implements SchedulerCore {
     }
 
     // Unless statuses were specifically supplied, only attempt to kill active tasks.
-    Query.Builder taskQuery = query.get().isSetStatuses() ? query.byStatus(ACTIVE_STATES) : query;
+    final Query.Builder taskQuery = query.get().isSetStatuses()
+        ? query.byStatus(ACTIVE_STATES)
+        : query;
+
+    int tasksAffected = storage.write(new MutateWork.Quiet<Integer>() {
+      @Override public Integer apply(MutableStoreProvider storeProvider) {
+        int total = 0;
+        for (String taskId : Tasks.ids(storeProvider.getTaskStore().fetchTasks(taskQuery))) {
+          boolean changed = stateManager.changeState(
+              taskId,
+              Optional.<ScheduleStatus>absent(),
+              KILLING,
+              Optional.of("Killed by " + user));
+
+          if (changed) {
+            total++;
+          }
+        }
+        return total;
+      }
+    });
 
-    int tasksAffected =
-        stateManager.changeState(taskQuery, KILLING, Optional.of("Killed by " + user));
     if (!jobDeleted && (tasksAffected == 0)) {
       throw new ScheduleException("No jobs to kill");
     }
@@ -307,22 +328,27 @@ class SchedulerCoreImpl implements SchedulerCore {
           throw new ScheduleException("Not all requested shards are active.");
         }
         LOG.info("Restarting shards matching " + query);
-        stateManager.changeState(
-            Query.taskScoped(Tasks.ids(matchingTasks)),
-            RESTARTING,
-            Optional.of("Restarted by " + requestingUser));
+        for (String taskId : Tasks.ids(matchingTasks)) {
+          stateManager.changeState(
+              taskId,
+              Optional.<ScheduleStatus>absent(),
+              RESTARTING,
+              Optional.of("Restarted by " + requestingUser));
+        }
       }
     });
   }
 
-
   @Override
   public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
     checkNotNull(task);
     checkNotNull(preemptingTask);
     // TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
 
-    stateManager.changeState(Query.taskScoped(task.getTaskId()), ScheduleStatus.PREEMPTING,
+    stateManager.changeState(
+        task.getTaskId(),
+        Optional.<ScheduleStatus>absent(),
+        ScheduleStatus.PREEMPTING,
         Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 045165d..785a5ef 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -21,27 +21,36 @@ import java.util.Set;
 import com.google.common.base.Optional;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
 
 /**
- * Thin interface for the state manager.
+ * A manager for the state of tasks.  Most modifications to tasks should be made here, especially
+ * those that alter the {@link ScheduleStatus} of tasks.
  */
 public interface StateManager {
 
   /**
-   * Performs a simple state change, transitioning all tasks matching a query to the given
-   * state and applying the given audit message.
-   * TODO(William Farner): Consider removing the return value.
+   * Attempts to alter a task from its existing state to {@code newState}. If a {@code casState} is
+   * provided, the transition will only performed if the task is currently in the state.
+   *
+   * @param taskId ID of the task to transition.
+   * @param casState State that the task must be in for the operation to proceed.  If the task
+   *                 is found to not be in {@code casState}, no action is performed and
+   *                 {@code false} is returned.
+   * @param newState State to move the task to.
+   * @param auditMessage Message to include with the transition.
+   * @return {@code true} if the transition was performed and the task was moved to
+   *         {@code newState}, {@code false} if the transition was not allowed, or the task was not
+   *         in {@code casState}.
    *
-   * @param query Builder of the query to perform, the results of which will be modified.
-   * @param newState State to move the resulting tasks into.
-   * @param auditMessage Audit message to apply along with the state change.
-   * @return the number of successful state changes.
    */
-  int changeState(Query.Builder query, ScheduleStatus newState, Optional<String> auditMessage);
+  boolean changeState(
+      String taskId,
+      Optional<ScheduleStatus> casState,
+      ScheduleStatus newState,
+      Optional<String> auditMessage);
 
   /**
    * Assigns a task to a specific slave.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index b6dd537..6078eee 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -31,8 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -53,8 +51,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -76,8 +72,8 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
  * Manager of all persistence-related operations for the scheduler.  Acts as a controller for
  * persisted state machine transitions, and their side-effects.
  *
- * TODO(William Farner): Re-evaluate thread safety here, specifically risk of races that
- * modify managerState.
+ * TODO(wfarner): This class is due for an overhaul.  There are several aspects of it that could
+ * probably be made much simpler.  The first phase
  */
 public class StateManagerImpl implements StateManager {
   private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
@@ -195,13 +191,15 @@ public class StateManagerImpl implements StateManager {
   }
 
   @Override
-  public int changeState(
-      Query.Builder query,
+  public boolean changeState(
+      String taskId,
+      Optional<ScheduleStatus> casState,
       final ScheduleStatus newState,
       final Optional<String> auditMessage) {
 
-    return changeState(query, new Function<TaskStateMachine, Boolean>() {
-      @Override public Boolean apply(TaskStateMachine stateMachine) {
+    return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
+      @Override
+      public Boolean apply(TaskStateMachine stateMachine) {
         return stateMachine.updateState(newState, auditMessage);
       }
     });
@@ -219,34 +217,26 @@ public class StateManagerImpl implements StateManager {
     checkNotNull(assignedPorts);
 
     TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
-    changeState(Query.taskScoped(taskId), mutation);
+    changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
 
     return mutation.getAssignedTask();
   }
 
-  private int changeStateInWriteOperation(
-      Set<String> taskIds,
-      Function<TaskStateMachine, Boolean> stateChange) {
-
-    int count = 0;
-    for (TaskStateMachine stateMachine : getStateMachines(taskIds).values()) {
-      if (stateChange.apply(stateMachine)) {
-        ++count;
-      }
-    }
-    return count;
-  }
-
-  private int changeState(
-      final Query.Builder query,
+  private boolean changeState(
+      final String taskId,
+      final Optional<ScheduleStatus> casState,
       final Function<TaskStateMachine, Boolean> stateChange) {
 
-    return storage.write(storage.new QuietSideEffectWork<Integer>() {
-      @Override public Integer apply(MutableStoreProvider storeProvider) {
-        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
-            .transform(Tasks.SCHEDULED_TO_ID)
-            .toSet();
-        return changeStateInWriteOperation(ids, stateChange);
+    return storage.write(storage.new QuietSideEffectWork<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        IScheduledTask task = Iterables.getOnlyElement(
+            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+            null);
+        if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
+          return false;
+        }
+
+        return stateChange.apply(getStateMachine(taskId, task));
       }
     });
   }
@@ -405,27 +395,6 @@ public class StateManagerImpl implements StateManager {
     });
   }
 
-  private Map<String, TaskStateMachine> getStateMachines(final Set<String> taskIds) {
-    return storage.consistentRead(new Work.Quiet<Map<String, TaskStateMachine>>() {
-      @Override public Map<String, TaskStateMachine> apply(StoreProvider storeProvider) {
-        Map<String, IScheduledTask> existingTasks = Maps.uniqueIndex(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
-            new Function<IScheduledTask, String>() {
-              @Override public String apply(IScheduledTask input) {
-                return input.getAssignedTask().getTaskId();
-              }
-            });
-
-        ImmutableMap.Builder<String, TaskStateMachine> builder = ImmutableMap.builder();
-        for (String taskId : taskIds) {
-          // Pass null get() values through.
-          builder.put(taskId, getStateMachine(taskId, existingTasks.get(taskId)));
-        }
-        return builder.build();
-      }
-    });
-  }
-
   private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
     if (task != null) {
       return createStateMachine(task, task.getStatus());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index c1a11bd..76caa62 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -672,8 +672,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       return response;
     }
 
-    schedulerCore.setTaskStatus(
-        Query.taskScoped(taskId), status, transitionMessage(context.getIdentity()));
+    schedulerCore.setTaskStatus(taskId, status, transitionMessage(context.getIdentity()));
     return new Response().setResponseCode(OK).setMessage("Transition attempted.");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index bba1b72..ffc37db 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Iterables;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.OfferQueue;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
@@ -88,9 +88,12 @@ public class UserTaskLauncherTest extends EasyMockTest {
 
   @Test
   public void testForwardsStatusUpdates() throws Exception {
-    expect(
-        stateManager.changeState(Query.taskScoped(TASK_ID_A), RUNNING, Optional.of("fake message")))
-        .andReturn(1);
+    expect(stateManager.changeState(
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
+        RUNNING,
+        Optional.of("fake message")))
+        .andReturn(true);
 
     control.replay();
 
@@ -114,7 +117,8 @@ public class UserTaskLauncherTest extends EasyMockTest {
   @Test(expected = StorageException.class)
   public void testFailedStatusUpdate() throws Exception {
     expect(stateManager.changeState(
-        Query.taskScoped(TASK_ID_A),
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
         RUNNING,
         Optional.of("fake message")))
         .andThrow(new StorageException("Injected error"));
@@ -132,10 +136,11 @@ public class UserTaskLauncherTest extends EasyMockTest {
   @Test
   public void testMemoryLimitTranslationHack() throws Exception {
     expect(stateManager.changeState(
-        Query.taskScoped(TASK_ID_A),
+        TASK_ID_A,
+        Optional.<ScheduleStatus>absent(),
         FAILED,
         Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY)))
-        .andReturn(0);
+        .andReturn(false);
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 9698f28..d06ea11 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -324,10 +324,11 @@ public class TaskSchedulerTest extends EasyMockTest {
     driver.launchTask(OFFER_A.getId(), mesosTask);
     expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
     expect(stateManager.changeState(
-        Query.taskScoped("a").byStatus(PENDING),
+        "a",
+        Optional.of(PENDING),
         LOST,
         TaskSchedulerImpl.LAUNCH_FAILED_MSG))
-        .andReturn(1);
+        .andReturn(true);
 
     replayAndCreateScheduler();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 023905b..d4685f6 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
@@ -35,7 +36,6 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -175,8 +175,12 @@ public class TaskTimeoutTest extends EasyMockTest {
     expectTaskWatch();
     expectCancel();
     Capture<Runnable> killingTimeout = expectTaskWatch();
-    Query.Builder query = Query.taskScoped(TASK_ID).byStatus(KILLING);
-    expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(1);
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.of(KILLING),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(true);
 
     replayAndCreate();
 
@@ -188,8 +192,12 @@ public class TaskTimeoutTest extends EasyMockTest {
   @Test
   public void testTimeout() throws Exception {
     Capture<Runnable> assignedTimeout = expectTaskWatch();
-    Query.Builder query = Query.taskScoped(TASK_ID).byStatus(ASSIGNED);
-    expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(1);
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.of(ASSIGNED),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(true);
 
     replayAndCreate();
 
@@ -202,8 +210,12 @@ public class TaskTimeoutTest extends EasyMockTest {
   @Test
   public void testTaskDeleted() throws Exception {
     Capture<Runnable> assignedTimeout = expectTaskWatch();
-    Query.Builder query = Query.taskScoped(TASK_ID).byStatus(KILLING);
-    expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(0);
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.of(KILLING),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(false);
 
     replayAndCreate();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 720d0c8..6ddea9f 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -1453,7 +1453,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
       ScheduleStatus status,
       Optional<String> message) {
 
-    scheduler.setTaskStatus(query, status, message);
+    for (String taskId : Tasks.ids(Storage.Util.consistentFetchTasks(storage, query))) {
+      scheduler.setTaskStatus(taskId, status, message);
+    }
   }
 
   public void changeStatus(Query.Builder query, ScheduleStatus status) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index c0addd6..f7f7632 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -33,6 +33,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -55,7 +56,6 @@ import static org.junit.Assert.assertEquals;
 public class MaintenanceControllerImplTest extends EasyMockTest {
 
   private static final String HOST_A = "a";
-  private static final String HOST_B = "b";
   private static final Set<String> A = ImmutableSet.of(HOST_A);
 
   private StorageTestUtil storageUtil;
@@ -102,10 +102,11 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
     expectMaintenanceModeChange(HOST_A, SCHEDULED);
     expectFetchTasksByHost(HOST_A, ImmutableSet.<ScheduledTask>of(task));
     expect(stateManager.changeState(
-        Query.slaveScoped(HOST_A).active(),
+        Tasks.id(task),
+        Optional.<ScheduleStatus>absent(),
         ScheduleStatus.RESTARTING,
         MaintenanceControllerImpl.DRAINING_MESSAGE))
-        .andReturn(1);
+        .andReturn(true);
     expectMaintenanceModeChange(HOST_A, DRAINING);
     expect(storageUtil.attributeStore.getHostAttributes(HOST_A))
         .andReturn(Optional.of(new HostAttributes().setHost(HOST_A).setMode(DRAINING)));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index b17b983..b93e47f 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -52,6 +52,7 @@ import org.easymock.IAnswer;
 import org.easymock.IArgumentMatcher;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
@@ -191,8 +192,8 @@ public class StateManagerImplTest extends EasyMockTest {
     control.replay();
 
     insertTask(task, 0);
-    assertEquals(1, changeState(taskId, KILLING));
-    assertEquals(0, changeState(taskId, KILLING));
+    assertEquals(true, changeState(taskId, KILLING));
+    assertEquals(false, changeState(taskId, KILLING));
   }
 
   @Test
@@ -217,16 +218,16 @@ public class StateManagerImplTest extends EasyMockTest {
 
   @Test
   public void testNestedEvents() {
-    String id = "a";
+    final String id = "a";
     ITaskConfig task = makeTask(JIM, MY_JOB);
     expect(taskIdGenerator.generate(task, 0)).andReturn(id);
 
     // Trigger an event that produces a side-effect and a PubSub event .
     eventSink.post(matchStateChange(id, INIT, PENDING));
     expectLastCall().andAnswer(new IAnswer<Void>() {
-      @Override public Void answer() throws Throwable {
-        stateManager.changeState(
-            Query.unscoped(), ScheduleStatus.ASSIGNED, Optional.<String>absent());
+      @Override
+      public Void answer() throws Throwable {
+        changeState(id, ASSIGNED);
         return null;
       }
     });
@@ -253,6 +254,17 @@ public class StateManagerImplTest extends EasyMockTest {
     stateManager.deleteTasks(ImmutableSet.of(taskId));
   }
 
+  @Test
+  public void testKillUnknownTask() {
+    String unknownTask = "unknown";
+
+    driver.killTask(unknownTask);
+
+    control.replay();
+
+    changeState(unknownTask, RUNNING);
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,
@@ -279,8 +291,12 @@ public class StateManagerImplTest extends EasyMockTest {
     stateManager.insertPendingTasks(ImmutableMap.of(instanceId, task));
   }
 
-  private int changeState(String taskId, ScheduleStatus status) {
-    return stateManager.changeState(Query.taskScoped(taskId), status, Optional.<String>absent());
+  private boolean changeState(String taskId, ScheduleStatus status) {
+    return stateManager.changeState(
+        taskId,
+        Optional.<ScheduleStatus>absent(),
+        status,
+        Optional.<String>absent());
   }
 
   private static ITaskConfig makeTask(Identity owner, String job) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 91c1c24..b46f29a 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -474,7 +474,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     String taskId = "task_id_foo";
     ScheduleStatus status = ScheduleStatus.FAILED;
 
-    scheduler.setTaskStatus(Query.taskScoped(taskId), status, transitionMessage(USER));
+    scheduler.setTaskStatus(taskId, status, transitionMessage(USER));
     // Expect auth is first called by an interceptor and then by SchedulerThriftInterface to extract
     // the SessionContext.
     // Note: This will change after AOP-style session validation passes in a SessionContext.


[2/3] Refactor StateManagerImpl and TaskStateMachine for less code and better readability.

Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
index e89e60a..0a3a520 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
@@ -15,23 +15,27 @@
  */
 package org.apache.aurora.scheduler.state;
 
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.base.Function;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.state.TaskStateMachine.WorkSink;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
+import org.apache.aurora.scheduler.state.TaskStateMachine.TransitionResult;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,74 +47,43 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
 import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.WorkCommand.DELETE;
-import static org.apache.aurora.scheduler.state.WorkCommand.INCREMENT_FAILURES;
-import static org.apache.aurora.scheduler.state.WorkCommand.KILL;
-import static org.apache.aurora.scheduler.state.WorkCommand.RESCHEDULE;
-import static org.apache.aurora.scheduler.state.WorkCommand.UPDATE_STATE;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class TaskStateMachineTest extends EasyMockTest {
+// TODO(wfarner): At this rate, it's probably best to exhaustively cover this class with a matrix
+// from every state to every state.
+public class TaskStateMachineTest {
 
-  private WorkSink workSink;
-  private FakeClock clock;
   private TaskStateMachine stateMachine;
 
   @Before
   public void setUp() {
-    workSink = createMock(WorkSink.class);
-    clock = new FakeClock();
-    stateMachine = makeStateMachine("test", makeTask(false));
+    stateMachine = makeStateMachine(makeTask(false));
   }
 
-  private TaskStateMachine makeStateMachine(String taskId, ScheduledTask builder) {
-    return new TaskStateMachine(
-        taskId,
-        IScheduledTask.build(builder),
-        workSink,
-        clock,
-        INIT);
+  private TaskStateMachine makeStateMachine(ScheduledTask builder) {
+    return new TaskStateMachine(IScheduledTask.build(builder));
   }
 
   @Test
   public void testSimpleTransition() {
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(DELETE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING);
-    assertEquals(INIT, stateMachine.getPreviousState());
-    transition(stateMachine, ASSIGNED);
-    assertEquals(PENDING, stateMachine.getPreviousState());
-    transition(stateMachine, STARTING);
-    assertEquals(ASSIGNED, stateMachine.getPreviousState());
-    transition(stateMachine, RUNNING);
-    assertEquals(STARTING, stateMachine.getPreviousState());
-    transition(stateMachine, FINISHED);
-    assertEquals(RUNNING, stateMachine.getPreviousState());
-    transition(stateMachine, UNKNOWN);
-    assertEquals(FINISHED, stateMachine.getPreviousState());
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING, FINISHED);
+    legalTransition(UNKNOWN, Action.DELETE);
   }
 
   @Test
   public void testServiceRescheduled() {
-    stateMachine = makeStateMachine("test", makeTask(true));
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FINISHED);
+    stateMachine = makeStateMachine(makeTask(true));
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(FINISHED, Action.SAVE_STATE, Action.RESCHEDULE);
   }
 
   @Test
@@ -118,12 +91,12 @@ public class TaskStateMachineTest extends EasyMockTest {
     Set<ScheduleStatus> terminalStates = Tasks.TERMINAL_STATES;
 
     for (ScheduleStatus endState : terminalStates) {
-      stateMachine = makeStateMachine("test", makeTask(false));
-      expectWork(UPDATE_STATE).times(5);
+      stateMachine = makeStateMachine(makeTask(false));
+      Set<SideEffect.Action> finalActions = Sets.newHashSet(Action.SAVE_STATE);
 
       switch (endState) {
         case FAILED:
-          expectWork(INCREMENT_FAILURES);
+          finalActions.add(Action.INCREMENT_FAILURES);
           break;
 
         case FINISHED:
@@ -131,138 +104,205 @@ public class TaskStateMachineTest extends EasyMockTest {
 
         case KILLED:
         case LOST:
-          expectWork(RESCHEDULE);
+          finalActions.add(Action.RESCHEDULE);
           break;
 
         case KILLING:
-          expectWork(KILL);
+          finalActions.add(Action.KILL);
           break;
 
         default:
           fail("Unknown state " + endState);
       }
 
-      control.replay();
-
-      transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, endState);
+      expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+      legalTransition(endState, finalActions);
 
       for (ScheduleStatus badTransition : terminalStates) {
-        transition(stateMachine, badTransition);
+        illegalTransition(badTransition);
       }
-
-      control.verify();
-      control.reset();
     }
-
-    control.replay();  // Needed so the teardown verify doesn't break.
   }
 
   @Test
   public void testUnknownTask() {
-    expectWork(KILL);
-
-    control.replay();
+    stateMachine = new TaskStateMachine("id");
 
-    transition(stateMachine, UNKNOWN, RUNNING);
+    illegalTransition(RUNNING, Action.KILL);
   }
 
   @Test
   public void testLostTask() {
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, LOST);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(LOST, Action.SAVE_STATE, Action.RESCHEDULE);
   }
 
   @Test
   public void testKilledPending() {
-    expectWork(UPDATE_STATE);
-    expectWork(DELETE);
+    expectUpdateStateOnTransitionTo(PENDING);
+    legalTransition(KILLING, Action.DELETE);
+  }
 
-    control.replay();
+  @Test
+  public void testMissingStartingRescheduledImmediately() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+    illegalTransition(UNKNOWN,
+        ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
+  }
 
-    transition(stateMachine, PENDING, KILLING);
+  @Test
+  public void testMissingRunningRescheduledImmediately() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    illegalTransition(UNKNOWN,
+        ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
   }
 
   @Test
-  public void testMissingStartingRescheduledImmediately() {
-    ScheduledTask task = makeTask(false);
-    task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.PENDING));
-    stateMachine = makeStateMachine("test", task);
+  public void testRestartedTask() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    legalTransition(FINISHED, Action.SAVE_STATE, Action.RESCHEDULE);
+  }
 
-    expectWork(UPDATE_STATE).times(4);
-    expectWork(RESCHEDULE);
+  @Test
+  public void testRogueRestartedTask() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    illegalTransition(RUNNING, Action.KILL);
+  }
 
-    control.replay();
+  @Test
+  public void testPendingRestartedTask() {
+    expectUpdateStateOnTransitionTo(PENDING);
+    // PENDING -> RESTARTING should not be allowed.
+    illegalTransition(RESTARTING);
+  }
 
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, UNKNOWN);
-    assertThat(stateMachine.getState(), is(ScheduleStatus.LOST));
+  @Test
+  public void testAllowsSkipStartingAndRunning() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, FINISHED);
   }
 
   @Test
-  public void testMissingRunningRescheduledImmediately() {
-    ScheduledTask task = makeTask(false);
-    task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.PENDING));
-    stateMachine = makeStateMachine("test", task);
+  public void testAllowsSkipRunning() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, FINISHED);
+  }
 
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
+  @Test
+  public void testRestartingToAssigned() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    illegalTransition(ASSIGNED, Action.KILL);
+  }
 
-    control.replay();
+  @Test
+  public void testRestartingToStarting() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    illegalTransition(STARTING, Action.KILL);
+  }
 
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, UNKNOWN);
-    assertThat(stateMachine.getState(), is(ScheduleStatus.LOST));
+  @Test
+  public void testRestartingToFailed() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    legalTransition(FAILED, Action.RESCHEDULE, Action.SAVE_STATE);
   }
 
   @Test
-  public void testRestartedTask() {
-    expectWork(UPDATE_STATE).times(6);
-    expectWork(KILL);
-    expectWork(RESCHEDULE);
+  public void testRestartingToKilled() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    legalTransition(KILLED, Action.RESCHEDULE, Action.SAVE_STATE);
+  }
 
-    control.replay();
+  @Test
+  public void testRestartingToLost() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    legalTransition(LOST, Action.SAVE_STATE, Action.KILL, Action.RESCHEDULE);
+  }
 
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, RESTARTING, FINISHED);
+  @Test
+  public void testRestartingToUnknown() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    illegalTransition(UNKNOWN,
+        ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
   }
 
   @Test
-  public void testRogueRestartedTask() {
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(KILL).times(2);
+  public void testAssignedToPrempting() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+    legalTransition(PREEMPTING, Action.SAVE_STATE, Action.KILL);
+  }
 
-    control.replay();
+  @Test
+  public void testAssignedToFailed() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+    legalTransition(FAILED, Action.SAVE_STATE, Action.INCREMENT_FAILURES);
+  }
 
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, RESTARTING, RUNNING);
+  @Test
+  public void testAssignedToRestarting() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
   }
 
   @Test
-  public void testPendingRestartedTask() {
-    expectWork(UPDATE_STATE).times(1);
+  public void testAssignedToKilled() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+    legalTransition(KILLED, Action.SAVE_STATE, Action.RESCHEDULE);
+  }
 
-    control.replay();
+  @Test
+  public void testAssignedToLost() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+    legalTransition(LOST, Action.SAVE_STATE, Action.RESCHEDULE, Action.KILL);
+  }
 
-    // PENDING -> RESTARTING should not be allowed.
-    transition(stateMachine, PENDING, RESTARTING);
+  @Test
+  public void testAssignedToKilling() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+    legalTransition(KILLING, Action.SAVE_STATE, Action.KILL);
   }
 
   @Test
-  public void testAllowsSkipStartingAndRunning() {
-    expectWork(UPDATE_STATE).times(3);
+  public void testAssignedToMissing() {
+    // No action is taken for ASSIGNED -> UNKNOWN since this usually means the slave has not yet
+    // received the task.  We rely on task timeouts to clean up tasks stalled in ASSIGNED.
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+    illegalTransition(UNKNOWN);
+  }
 
-    control.replay();
+  @Test
+  public void testStartingToRestarting() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+  }
 
-    transition(stateMachine, PENDING, ASSIGNED, FINISHED);
+  @Test
+  public void testStartingToPreempting() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+    legalTransition(PREEMPTING, Action.SAVE_STATE, Action.KILL);
   }
 
   @Test
-  public void testAllowsSkipRunning() {
-    expectWork(UPDATE_STATE).times(4);
+  public void testStartingToKilled() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+    legalTransition(KILLED, Action.SAVE_STATE, Action.RESCHEDULE);
+  }
 
-    control.replay();
+  @Test
+  public void testStartingToLost() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+    legalTransition(LOST, Action.SAVE_STATE, Action.RESCHEDULE);
+  }
 
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, FINISHED);
+  @Test
+  public void testRunningToPreempting() {
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, RUNNING);
+    legalTransition(PREEMPTING, Action.SAVE_STATE, Action.KILL);
   }
 
   @Test
@@ -270,23 +310,15 @@ public class TaskStateMachineTest extends EasyMockTest {
     ScheduledTask task = makeTask(false);
     task.getAssignedTask().getTask().setMaxTaskFailures(10);
     task.setFailureCount(8);
-    stateMachine = makeStateMachine("test", task);
-
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-    expectWork(INCREMENT_FAILURES);
+    stateMachine = makeStateMachine(task);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(FAILED, Action.SAVE_STATE, Action.RESCHEDULE, Action.INCREMENT_FAILURES);
 
     ScheduledTask rescheduled = task.deepCopy();
     rescheduled.setFailureCount(9);
-    TaskStateMachine rescheduledMachine = makeStateMachine("test2", rescheduled);
-    expectWork(UPDATE_STATE, rescheduledMachine).times(5);
-    expectWork(INCREMENT_FAILURES, rescheduledMachine);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
-
-    transition(rescheduledMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+    stateMachine = makeStateMachine(rescheduled);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(FAILED, Action.SAVE_STATE, Action.INCREMENT_FAILURES);
   }
 
   @Test
@@ -294,53 +326,306 @@ public class TaskStateMachineTest extends EasyMockTest {
     ScheduledTask task = makeTask(false);
     task.getAssignedTask().getTask().setMaxTaskFailures(-1);
     task.setFailureCount(1000);
-    stateMachine = makeStateMachine("test", task);
+    stateMachine = makeStateMachine(task);
 
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-    expectWork(INCREMENT_FAILURES);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(FAILED, Action.SAVE_STATE, Action.RESCHEDULE, Action.INCREMENT_FAILURES);
   }
 
   @Test
   public void testKillingRequest() {
-    expectWork(UPDATE_STATE).times(6);
-    expectWork(KILL);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(KILLING, Action.KILL, Action.SAVE_STATE);
+    expectUpdateStateOnTransitionTo(KILLED);
+  }
 
-    control.replay();
+  private static final Function<Action, SideEffect> TO_SIDE_EFFECT =
+      new Function<Action, SideEffect>() {
+        @Override public SideEffect apply(Action action) {
+          return new SideEffect(action, Optional.<ScheduleStatus>absent());
+        }
+      };
 
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, KILLING, KILLED);
+  private void legalTransition(ScheduleStatus state, SideEffect.Action... expectedActions) {
+    legalTransition(state, ImmutableSet.copyOf(expectedActions));
   }
 
-  private static void transition(TaskStateMachine stateMachine, ScheduleStatus... states) {
+  private void legalTransition(ScheduleStatus state, Set<SideEffect.Action> expectedActions) {
+    ScheduleStatus initialStatus = stateMachine.getState();
+    TransitionResult result = stateMachine.updateState(state);
+    assertTrue("Transition to " + state + " was not successful", result.isSuccess());
+    assertEquals(initialStatus, stateMachine.getPreviousState());
+    assertEquals(state, stateMachine.getState());
+    assertEquals(
+        FluentIterable.from(expectedActions).transform(TO_SIDE_EFFECT).toSet(),
+        result.getSideEffects());
+  }
+
+  private void expectUpdateStateOnTransitionTo(ScheduleStatus... states) {
     for (ScheduleStatus status : states) {
-      stateMachine.updateState(status);
+      legalTransition(status, Action.SAVE_STATE);
     }
   }
 
-  private IExpectationSetters<Void> expectWork(WorkCommand work) {
-    return expectWork(work, stateMachine);
+  private void illegalTransition(ScheduleStatus state, SideEffect.Action... expectedActions) {
+    illegalTransition(
+        state,
+        FluentIterable.from(
+            ImmutableSet.copyOf(expectedActions)).transform(TO_SIDE_EFFECT).toSet());
   }
 
-  private IExpectationSetters<Void> expectWork(WorkCommand work, TaskStateMachine machine) {
-    workSink.addWork(
-        eq(work),
-        eq(machine),
-        EasyMock.<Function<IScheduledTask, IScheduledTask>>anyObject());
-    return expectLastCall();
+  private void illegalTransition(ScheduleStatus state, Set<SideEffect> sideEffects) {
+    ScheduleStatus initialStatus = stateMachine.getState();
+    TransitionResult result = stateMachine.updateState(state);
+    assertEquals(initialStatus, stateMachine.getState());
+    assertFalse(result.isSuccess());
+    assertEquals(sideEffects, result.getSideEffects());
   }
 
   private static ScheduledTask makeTask(boolean service) {
     return new ScheduledTask()
+        .setStatus(INIT)
         .setAssignedTask(
             new AssignedTask()
+                .setTaskId("test")
                 .setTask(
                     new TaskConfig()
                         .setOwner(new Identity().setRole("roleA"))
                         .setJobName("jobA")
                         .setIsService(service)));
   }
+
+  private static final TransitionResult LEGAL_NO_ACTION =
+      new TransitionResult(true, ImmutableSet.<SideEffect>of());
+  private static final TransitionResult SAVE = new TransitionResult(
+      true,
+      ImmutableSet.of(new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult SAVE_AND_KILL = new TransitionResult(
+      true,
+      ImmutableSet.of(
+          new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult SAVE_AND_RESCHEDULE = new TransitionResult(
+      true,
+      ImmutableSet.of(
+          new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.RESCHEDULE, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult SAVE_KILL_AND_RESCHEDULE = new TransitionResult(
+      true,
+      ImmutableSet.of(
+          new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.RESCHEDULE, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult ILLEGAL_KILL = new TransitionResult(
+      false,
+      ImmutableSet.of(new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult RECORD_FAILURE = new TransitionResult(
+      true,
+      ImmutableSet.of(
+          new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.INCREMENT_FAILURES, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult DELETE_TASK = new TransitionResult(
+      true,
+      ImmutableSet.of(new SideEffect(Action.DELETE, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult MARK_LOST = new TransitionResult(
+      false,
+      ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
+
+  private static final class TestCase {
+    private final boolean taskPresent;
+    private final ScheduleStatus from;
+    private final ScheduleStatus to;
+
+    private TestCase(boolean taskPresent, ScheduleStatus from, ScheduleStatus to) {
+      this.taskPresent = taskPresent;
+      this.from = from;
+      this.to = to;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(taskPresent, from, to);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof TestCase)) {
+        return false;
+      }
+
+      TestCase other = (TestCase) o;
+      return (taskPresent == other.taskPresent)
+          && (from == other.from)
+          && (to == other.to);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+          .add("taskPresent", taskPresent)
+          .add("from", from)
+          .add("to", to)
+          .toString();
+    }
+  }
+
+  private static final Map<TestCase, TransitionResult> EXPECTATIONS =
+      ImmutableMap.<TestCase, TransitionResult>builder()
+          .put(new TestCase(true, INIT, PENDING), SAVE)
+          .put(new TestCase(false, INIT, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, INIT, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, INIT, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, INIT, UNKNOWN), LEGAL_NO_ACTION)
+          .put(new TestCase(false, THROTTLED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, THROTTLED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, THROTTLED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, PENDING, ASSIGNED), SAVE)
+          .put(new TestCase(false, PENDING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, PENDING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, PENDING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, PENDING, KILLING), DELETE_TASK)
+          .put(new TestCase(false, ASSIGNED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, ASSIGNED, STARTING), SAVE)
+          .put(new TestCase(false, ASSIGNED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, ASSIGNED, RUNNING), SAVE)
+          .put(new TestCase(false, ASSIGNED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, ASSIGNED, FINISHED), SAVE)
+          .put(new TestCase(true, ASSIGNED, PREEMPTING), SAVE_AND_KILL)
+          .put(new TestCase(true, ASSIGNED, RESTARTING), SAVE_AND_KILL)
+          .put(new TestCase(true, ASSIGNED, FAILED), RECORD_FAILURE)
+          .put(new TestCase(true, ASSIGNED, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, ASSIGNED, KILLING), SAVE_AND_KILL)
+          .put(new TestCase(true, ASSIGNED, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(false, STARTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, STARTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, STARTING, RUNNING), SAVE)
+          .put(new TestCase(false, STARTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, STARTING, FINISHED), SAVE)
+          .put(new TestCase(true, STARTING, PREEMPTING), SAVE_AND_KILL)
+          .put(new TestCase(true, STARTING, RESTARTING), SAVE_AND_KILL)
+          .put(new TestCase(true, STARTING, FAILED), RECORD_FAILURE)
+          .put(new TestCase(true, STARTING, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, STARTING, KILLING), SAVE_AND_KILL)
+          .put(new TestCase(true, STARTING, LOST), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, STARTING, UNKNOWN), MARK_LOST)
+          .put(new TestCase(false, RUNNING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, RUNNING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, RUNNING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, RUNNING, FINISHED), SAVE)
+          .put(new TestCase(true, RUNNING, PREEMPTING), SAVE_AND_KILL)
+          .put(new TestCase(true, RUNNING, RESTARTING), SAVE_AND_KILL)
+          .put(new TestCase(true, RUNNING, FAILED), RECORD_FAILURE)
+          .put(new TestCase(true, RUNNING, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RUNNING, KILLING), SAVE_AND_KILL)
+          .put(new TestCase(true, RUNNING, LOST), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RUNNING, UNKNOWN), MARK_LOST)
+          .put(new TestCase(true, FINISHED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, FINISHED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, FINISHED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, FINISHED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, FINISHED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, FINISHED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, FINISHED, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(true, PREEMPTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, PREEMPTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, PREEMPTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, PREEMPTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, PREEMPTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, PREEMPTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, PREEMPTING, FINISHED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, PREEMPTING, FAILED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, PREEMPTING, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, PREEMPTING, KILLING), SAVE)
+          .put(new TestCase(true, PREEMPTING, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(true, PREEMPTING, UNKNOWN), MARK_LOST)
+          .put(new TestCase(true, RESTARTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, RESTARTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, RESTARTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, RESTARTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, RESTARTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, RESTARTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, RESTARTING, FINISHED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RESTARTING, FAILED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RESTARTING, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RESTARTING, KILLING), SAVE)
+          .put(new TestCase(true, RESTARTING, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(true, RESTARTING, UNKNOWN), MARK_LOST)
+          .put(new TestCase(true, FAILED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, FAILED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, FAILED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, FAILED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, FAILED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, FAILED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, FAILED, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(true, KILLED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLED, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(true, KILLING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLING, FINISHED), SAVE)
+          .put(new TestCase(true, KILLING, FAILED), SAVE)
+          .put(new TestCase(true, KILLING, KILLED), SAVE)
+          .put(new TestCase(true, KILLING, LOST), SAVE)
+          .put(new TestCase(true, KILLING, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(true, LOST, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, LOST, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, LOST, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, LOST, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, LOST, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, LOST, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, LOST, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(false, UNKNOWN, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, UNKNOWN, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, UNKNOWN, RUNNING), ILLEGAL_KILL)
+          .build();
+
+  @Test
+  public void exhaustivelyTestTransitions() {
+    for (ScheduleStatus from : ScheduleStatus.values()) {
+      for (ScheduleStatus to : ScheduleStatus.values()) {
+        for (Boolean taskPresent : ImmutableList.of(Boolean.TRUE, Boolean.FALSE)) {
+          TestCase testCase = new TestCase(taskPresent, from, to);
+
+          TransitionResult expectation = EXPECTATIONS.get(testCase);
+          if (expectation == null) {
+            expectation = new TransitionResult(false, ImmutableSet.<SideEffect>of());
+          }
+
+          TaskStateMachine machine;
+          if (taskPresent) {
+            // Cannot create a state machine for an UNKNOWN task that is in the store.
+            boolean expectException = from == UNKNOWN;
+            try {
+              machine =
+                  new TaskStateMachine(IScheduledTask.build(makeTask(false).setStatus(from)));
+              if (expectException) {
+                fail();
+              }
+            } catch (IllegalStateException e) {
+              if (!expectException) {
+                throw e;
+              } else {
+                continue;
+              }
+            }
+          } else {
+            machine = new TaskStateMachine("name");
+          }
+
+          assertEquals(
+              "Unexpected behavor for " + testCase,
+              expectation,
+              machine.updateState(to));
+        }
+      }
+    }
+  }
 }


[3/3] git commit: Refactor StateManagerImpl and TaskStateMachine for less code and better readability.

Posted by wf...@apache.org.
Refactor StateManagerImpl and TaskStateMachine for less code and better readability.


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/70adc19a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/70adc19a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/70adc19a

Branch: refs/heads/wfarner/state_machine_refactor
Commit: 70adc19ad99be08de93fd2b501b7f8bab83b0953
Parents: 801dcfb
Author: Bill Farner <bi...@twitter.com>
Authored: Thu Jan 9 16:07:56 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Jan 13 12:26:17 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/state/SideEffect.java      |  94 +++
 .../scheduler/state/SideEffectStorage.java      | 169 ------
 .../scheduler/state/StateManagerImpl.java       | 500 +++++++--------
 .../scheduler/state/TaskStateMachine.java       | 546 +++++++----------
 .../aurora/scheduler/state/WorkCommand.java     |  33 -
 .../aurora/scheduler/storage/Storage.java       |   4 +
 .../aurora/scheduler/storage/TaskStore.java     |   2 +
 .../scheduler/state/StateManagerImplTest.java   |  95 ++-
 .../scheduler/state/TaskStateMachineTest.java   | 601 ++++++++++++++-----
 9 files changed, 1116 insertions(+), 928 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
new file mode 100644
index 0000000..5759691
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+/**
+ * Descriptions of the different types of external work commands that task state machines may
+ * trigger.
+ */
+class SideEffect {
+  private final Action action;
+  private final Optional<ScheduleStatus> nextState;
+
+  SideEffect(Action action, Optional<ScheduleStatus> nextState) {
+    this.action = action;
+    if (action == Action.STATE_CHANGE) {
+      Preconditions.checkArgument(
+          nextState.isPresent(),
+          "A next state must be provided for a state change action.");
+    }
+    this.nextState = nextState;
+  }
+
+  public Action getAction() {
+    return action;
+  }
+
+  public Optional<ScheduleStatus> getNextState() {
+    return nextState;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof SideEffect)) {
+      return false;
+    }
+
+    SideEffect other = (SideEffect) o;
+    return Objects.equal(action, other.action)
+        && Objects.equal(nextState, other.nextState);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(action, nextState);
+  }
+
+  @Override
+  public String toString() {
+    if (nextState.isPresent()) {
+      return action.toString() + " " + nextState.get();
+    } else {
+      return action.toString();
+    }
+  }
+
+  enum Action {
+    // Send an instruction for the runner of this task to kill the task.
+    KILL,
+
+    // Create a new state machine with a copy of this task.
+    RESCHEDULE,
+
+    // Update the task's state (schedule status) in the persistent store to match the state machine.
+    SAVE_STATE,
+
+    // Delete this task from the persistent store.
+    DELETE,
+
+    // Increment the failure count for this task.
+    INCREMENT_FAILURES,
+
+    // Perform an additional state change on the task.
+    STATE_CHANGE
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
deleted file mode 100644
index 2bdd459..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around the persistent storage and mutable state.
- */
-class SideEffectStorage {
-
-  private final Queue<PubsubEvent> events = Lists.newLinkedList();
-  @VisibleForTesting
-  Queue<PubsubEvent> getEvents() {
-    return events;
-  }
-
-  private AtomicBoolean inOperation = new AtomicBoolean(false);
-
-  private final Storage storage;
-  private final OperationFinalizer operationFinalizer;
-  private final EventSink eventSink;
-
-  interface OperationFinalizer {
-    /**
-     * Performs any work necessary to complete the operation.
-     * This is executed in the context of a write operation, immediately after the work
-     * executes normally.
-     * NOTE: At present, this is executed for every nesting level of operations, rather than
-     * at the completion of the top-level operation.
-     * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
-     * for more detail.
-     *
-     * @param work Work to finalize.
-     * @param storeProvider Mutable store reference.
-     */
-    void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
-  }
-
-  SideEffectStorage(
-      Storage storage,
-      OperationFinalizer operationFinalizer,
-      EventSink eventSink) {
-
-    this.storage = checkNotNull(storage);
-    this.operationFinalizer = checkNotNull(operationFinalizer);
-    this.eventSink = checkNotNull(eventSink);
-  }
-
-  /**
-   * Perform a unit of work in a mutating operation.  This supports nesting/reentrancy.
-   *
-   * @param work Work to perform.
-   * @param <T> Work return type
-   * @param <E> Work exception type.
-   * @return The work return value.
-   * @throws E The work exception.
-   */
-  <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
-    return storage.write(executeSideEffectsAfter(work));
-  }
-
-  <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    return storage.consistentRead(work);
-  }
-
-  /**
-   * Work that has side effects external to the storage system.
-   * Work may add side effect and pubsub events, which will be executed/sent upon normal
-   * completion of the operation.
-   *
-   * @param <T> Work return type.
-   * @param <E> Work exception type.
-   */
-  abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
-    protected final void addTaskEvent(PubsubEvent notice) {
-      Preconditions.checkState(inOperation.get());
-      events.add(Preconditions.checkNotNull(notice));
-    }
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions.
-   *
-   * @param <T>   Work return type.
-   */
-  abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
-  }
-
-  /**
-   * Work with side effects that does not have a return value.
-   *
-   * @param <E> Work exception type.
-   */
-  abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
-
-    @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
-      execute(storeProvider);
-      return null;
-    }
-
-    abstract void execute(MutableStoreProvider storeProvider) throws E;
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions or have a return
-   * value.
-   */
-  abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
-  }
-
-  private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
-      final SideEffectWork<T, E> work) {
-
-    return new MutateWork<T, E>() {
-      @Override public T apply(MutableStoreProvider storeProvider) throws E {
-        boolean topLevelOperation = inOperation.compareAndSet(false, true);
-
-        try {
-          T result = work.apply(storeProvider);
-
-          // TODO(William Farner): Maintaining this since it matches prior behavior, but this
-          // seems wrong.  Double-check whether this is necessary, or if only the top-level
-          // operation should be executing the finalizer.  Update doc on OperationFinalizer
-          // once this is assessed.
-          operationFinalizer.finalize(work, storeProvider);
-          if (topLevelOperation) {
-            while (!events.isEmpty()) {
-              eventSink.post(events.remove());
-            }
-          }
-          return result;
-        } finally {
-          if (topLevelOperation) {
-            inOperation.set(false);
-          }
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6078eee..819d921 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -15,42 +15,48 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Comparator;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
+import org.apache.aurora.scheduler.state.TaskStateMachine.TransitionResult;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -62,11 +68,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.transform;
 import static com.twitter.common.base.MorePreconditions.checkNotBlank;
 
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
-
 
 /**
  * Manager of all persistence-related operations for the scheduler.  Acts as a controller for
@@ -78,38 +82,11 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
 public class StateManagerImpl implements StateManager {
   private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
 
-  private final SideEffectStorage storage;
-  @VisibleForTesting
-  SideEffectStorage getStorage() {
-    return storage;
-  }
-
+  private final Storage storage;
+  private final Clock clock;
+  private final Driver driver;
   private final TaskIdGenerator taskIdGenerator;
-
-  // Work queue to receive state machine side effect work.
-  // Items are sorted to place DELETE entries last.  This is to ensure that within an operation,
-  // a delete is always processed after a state transition.
-  private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
-      new Comparator<WorkEntry>() {
-        @Override public int compare(WorkEntry a, WorkEntry b) {
-          if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
-            return (a.command == WorkCommand.DELETE) ? 1 : -1;
-          } else {
-            return 0;
-          }
-        }
-      });
-
-  // Adapt the work queue into a sink.
-  private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
-      @Override public void addWork(
-          WorkCommand work,
-          TaskStateMachine stateMachine,
-          Function<IScheduledTask, IScheduledTask> mutation) {
-
-        workQueue.add(new WorkEntry(work, stateMachine, mutation));
-      }
-    };
+  private final EventSink eventSink;
 
   private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
       new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
@@ -125,28 +102,6 @@ public class StateManagerImpl implements StateManager {
         }
       };
 
-  private final Driver driver;
-  private final Clock clock;
-
-  /**
-   * An item of work on the work queue.
-   */
-  private static class WorkEntry {
-    private final WorkCommand command;
-    private final TaskStateMachine stateMachine;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    WorkEntry(
-        WorkCommand command,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      this.command = command;
-      this.stateMachine = stateMachine;
-      this.mutation = mutation;
-    }
-  }
-
   @Inject
   StateManagerImpl(
       final Storage storage,
@@ -155,20 +110,11 @@ public class StateManagerImpl implements StateManager {
       TaskIdGenerator taskIdGenerator,
       EventSink eventSink) {
 
-    checkNotNull(storage);
+    this.storage = checkNotNull(storage);
     this.clock = checkNotNull(clock);
-
-    OperationFinalizer finalizer = new OperationFinalizer() {
-      @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
-        processWorkQueueInWriteOperation(work, store);
-      }
-    };
-
-    this.storage = new SideEffectStorage(storage, finalizer, eventSink);
     this.driver = checkNotNull(driver);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
-
-    Stats.exportSize("work_queue_depth", workQueue);
+    this.eventSink = checkNotNull(eventSink);
   }
 
   @Override
@@ -179,12 +125,16 @@ public class StateManagerImpl implements StateManager {
     final Set<IScheduledTask> scheduledTasks =
         ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
 
-    storage.write(storage.new NoResultQuietSideEffectWork() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(MutableStoreProvider storeProvider) {
         storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
 
         for (IScheduledTask task : scheduledTasks) {
-          createStateMachine(task).updateState(PENDING);
+          updateTaskAndExternalState(
+              Tasks.id(task),
+              Optional.of(task),
+              PENDING,
+              Optional.<String>absent());
         }
       }
     });
@@ -197,54 +147,55 @@ public class StateManagerImpl implements StateManager {
       final ScheduleStatus newState,
       final Optional<String> auditMessage) {
 
-    return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
-      @Override
-      public Boolean apply(TaskStateMachine stateMachine) {
-        return stateMachine.updateState(newState, auditMessage);
-      }
-    });
+    return updateTaskAndExternalState(casState, taskId, newState, auditMessage);
   }
 
   @Override
   public IAssignedTask assignTask(
-      String taskId,
-      String slaveHost,
-      SlaveID slaveId,
-      Set<Integer> assignedPorts) {
+      final String taskId,
+      final String slaveHost,
+      final SlaveID slaveId,
+      final Set<Integer> assignedPorts) {
 
     checkNotBlank(taskId);
     checkNotBlank(slaveHost);
+    checkNotNull(slaveId);
     checkNotNull(assignedPorts);
 
-    TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
-    changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
-
-    return mutation.getAssignedTask();
-  }
-
-  private boolean changeState(
-      final String taskId,
-      final Optional<ScheduleStatus> casState,
-      final Function<TaskStateMachine, Boolean> stateChange) {
-
-    return storage.write(storage.new QuietSideEffectWork<Boolean>() {
-      @Override public Boolean apply(MutableStoreProvider storeProvider) {
-        IScheduledTask task = Iterables.getOnlyElement(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
-            null);
-        if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
-          return false;
-        }
+    return storage.write(new MutateWork.Quiet<IAssignedTask>() {
+      @Override public IAssignedTask apply(MutableStoreProvider storeProvider) {
+        boolean success = updateTaskAndExternalState(
+            Optional.<ScheduleStatus>absent(),
+            taskId,
+            ASSIGNED,
+            Optional.<String>absent());
+
+        Preconditions.checkState(
+            success,
+            "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
+        Query.Builder query = Query.taskScoped(taskId);
+        storeProvider.getUnsafeTaskStore().mutateTasks(query,
+            new Function<IScheduledTask, IScheduledTask>() {
+              @Override
+              public IScheduledTask apply(IScheduledTask task) {
+                ScheduledTask builder = task.newBuilder();
+                AssignedTask assigned = builder.getAssignedTask();
+                assigned.setAssignedPorts(
+                    getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+                assigned.setSlaveHost(slaveHost)
+                    .setSlaveId(slaveId.getValue());
+                return IScheduledTask.build(builder);
+              }
+            });
 
-        return stateChange.apply(getStateMachine(taskId, task));
+        return Iterables.getOnlyElement(
+            Iterables.transform(
+                storeProvider.getTaskStore().fetchTasks(query),
+                Tasks.SCHEDULED_TO_ASSIGNED));
       }
     });
   }
 
-  private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
-    IAssignedTask getAssignedTask();
-  }
-
   private static Map<String, Integer> getNameMappedPorts(
       Set<String> portNames,
       Set<Integer> allocatedPorts) {
@@ -270,157 +221,218 @@ public class StateManagerImpl implements StateManager {
     return ports;
   }
 
-  private TaskAssignMutation assignHost(
-      final String slaveHost,
-      final SlaveID slaveId,
-      final Set<Integer> assignedPorts) {
+  @VisibleForTesting
+  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
+      new Supplier<String>() {
+        @Override public String get() {
+          try {
+            return InetAddress.getLocalHost().getHostName();
+          } catch (UnknownHostException e) {
+            LOG.log(Level.SEVERE, "Failed to get self hostname.");
+            throw Throwables.propagate(e);
+          }
+        }
+      });
 
-    final TaskMutation mutation = new TaskMutation() {
-      @Override public IScheduledTask apply(IScheduledTask task) {
-        ScheduledTask builder = task.newBuilder();
-        AssignedTask assigned = builder.getAssignedTask();
-        assigned.setAssignedPorts(
-            getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
-        assigned.setSlaveHost(slaveHost)
-            .setSlaveId(slaveId.getValue());
-        return IScheduledTask.build(builder);
-      }
-    };
+  private boolean updateTaskAndExternalState(
+      final Optional<ScheduleStatus> casState,
+      final String taskId,
+      final ScheduleStatus targetState,
+      final Optional<String> transitionMessage) {
 
-    return new TaskAssignMutation() {
-      private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
-      @Override public IAssignedTask getAssignedTask() {
-        return assignedTask.get();
-      }
+    return storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
+            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+            null));
 
-      @Override public Boolean apply(final TaskStateMachine stateMachine) {
-        TaskMutation wrapper = new TaskMutation() {
-          @Override public IScheduledTask apply(IScheduledTask task) {
-            IScheduledTask mutated = mutation.apply(task);
-            Preconditions.checkState(
-                assignedTask.compareAndSet(null, mutated.getAssignedTask()),
-                "More than one result was found for an identity query.");
-            return mutated;
-          }
-        };
-        return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
+        // CAS operation fails if the task does not exist, or the states don't match.
+        if (casState.isPresent()
+            && (!task.isPresent() || (casState.get() != task.get().getStatus()))) {
+
+          return false;
+        }
+
+        return updateTaskAndExternalState(taskId, task, targetState, transitionMessage);
       }
-    };
+    });
   }
 
-  private void processWorkQueueInWriteOperation(
-      SideEffectWork<?, ?> sideEffectWork,
-      MutableStoreProvider storeProvider) {
-
-    for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
-      final TaskStateMachine stateMachine = work.stateMachine;
-
-      if (work.command == WorkCommand.KILL) {
-        driver.killTask(stateMachine.getTaskId());
-      } else {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        String taskId = stateMachine.getTaskId();
-        Query.Builder idQuery = Query.taskScoped(taskId);
-
-        switch (work.command) {
-          case RESCHEDULE:
-            ScheduledTask builder =
-                Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
-            builder.getAssignedTask().unsetSlaveId();
-            builder.getAssignedTask().unsetSlaveHost();
-            builder.getAssignedTask().unsetAssignedPorts();
-            builder.unsetTaskEvents();
-            builder.setAncestorId(taskId);
-            String newTaskId = taskIdGenerator.generate(
-                ITaskConfig.build(builder.getAssignedTask().getTask()),
-                builder.getAssignedTask().getInstanceId());
-            builder.getAssignedTask().setTaskId(newTaskId);
-
-            LOG.info("Task being rescheduled: " + taskId);
-
-            IScheduledTask task = IScheduledTask.build(builder);
-            taskStore.saveTasks(ImmutableSet.of(task));
-
-            createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
-            ITaskConfig taskInfo = task.getAssignedTask().getTask();
-            sideEffectWork.addTaskEvent(
-                new PubsubEvent.TaskRescheduled(
-                    taskInfo.getOwner().getRole(),
-                    taskInfo.getJobName(),
-                    task.getAssignedTask().getInstanceId()));
-            break;
-
-          case UPDATE_STATE:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return work.mutation.apply(
-                    IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
-              }
-            });
-            sideEffectWork.addTaskEvent(
-                PubsubEvent.TaskStateChange.transition(
-                    Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
-                    stateMachine.getPreviousState()));
-            break;
-
-          case DELETE:
-            deleteTasks(ImmutableSet.of(taskId));
-            break;
-
-          case INCREMENT_FAILURES:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return IScheduledTask.build(
-                    task.newBuilder().setFailureCount(task.getFailureCount() + 1));
-              }
-            });
-            break;
+  private static final Function<SideEffect, SideEffect.Action> GET_ACTION =
+      new Function<SideEffect, Action>() {
+        @Override public Action apply(SideEffect sideEffect) {
+          return sideEffect.getAction();
+        }
+      };
+
+  private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
+      Action.INCREMENT_FAILURES,
+      Action.SAVE_STATE,
+      Action.STATE_CHANGE,
+      Action.RESCHEDULE,
+      Action.KILL,
+      Action.DELETE);
+  static {
+    // Sanity check to ensure no actions are missing.
+    Preconditions.checkState(
+        ImmutableSet.copyOf(ACTIONS_IN_ORDER).equals(ImmutableSet.copyOf(Action.values())),
+        "Not all actions are included in ordering.");
+  }
+
+  // Actions are deliberately ordered to prevent things like deleting a task before rescheduling it
+  // (thus losing the object to copy), or rescheduling a task before incrementing the failure count
+  // (thus not carrying forward the failure increment).
+  private static final Ordering<SideEffect> ACTION_ORDER =
+      Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
 
-          default:
-            LOG.severe("Unrecognized work command type " + work.command);
+  private boolean updateTaskAndExternalState(
+      final String taskId,
+      // Note: This argument is deliberately non-final, and should not be made final.
+      // This is because using the captured value within the storage operation below is
+      // highly-risky, since it doesn't necessarily represent the value in storage.
+      // As a result, it would be easy to accidentally clobber mutations.
+      Optional<IScheduledTask> task,
+      final ScheduleStatus targetState,
+      final Optional<String> transitionMessage) {
+
+    if (task.isPresent()) {
+      Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId()));
+    }
+
+    final List<PubsubEvent> events = Lists.newArrayList();
+
+    final TaskStateMachine stateMachine = task.isPresent()
+        ? new TaskStateMachine(task.get())
+        : new TaskStateMachine(taskId);
+
+    boolean success = storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        TransitionResult result = stateMachine.updateState(targetState);
+        Query.Builder query = Query.taskScoped(taskId);
+
+        for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
+          Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
+              Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query), null));
+
+          switch (sideEffect.getAction()) {
+            case KILL:
+              driver.killTask(taskId);
+              break;
+
+            case RESCHEDULE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+              LOG.info("Task being rescheduled: " + taskId);
+
+              ScheduledTask builder = upToDateTask.get().newBuilder();
+              builder.setStatus(INIT);
+              builder.getAssignedTask().unsetSlaveId();
+              builder.getAssignedTask().unsetSlaveHost();
+              builder.getAssignedTask().unsetAssignedPorts();
+              builder.unsetTaskEvents();
+              builder.setAncestorId(taskId);
+              String newTaskId = taskIdGenerator.generate(
+                  ITaskConfig.build(builder.getAssignedTask().getTask()),
+                  builder.getAssignedTask().getInstanceId());
+              builder.getAssignedTask().setTaskId(newTaskId);
+
+              IScheduledTask newTask = IScheduledTask.build(builder);
+              storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(newTask));
+              updateTaskAndExternalState(
+                  newTaskId,
+                  Optional.of(newTask),
+                  PENDING, Optional.of("Rescheduled"));
+
+              ITaskConfig taskInfo = newTask.getAssignedTask().getTask();
+              events.add(
+                  new PubsubEvent.TaskRescheduled(
+                      taskInfo.getOwner().getRole(),
+                      taskInfo.getJobName(),
+                      newTask.getAssignedTask().getInstanceId()));
+              break;
+
+            case SAVE_STATE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+
+              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+                @Override public IScheduledTask apply(IScheduledTask task) {
+                  ScheduledTask mutableTask = task.newBuilder();
+                  mutableTask.setStatus(stateMachine.getState());
+                  mutableTask.addToTaskEvents(new TaskEvent()
+                      .setTimestamp(clock.nowMillis())
+                      .setStatus(targetState)
+                      .setMessage(transitionMessage.orNull())
+                      .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+                  return IScheduledTask.build(mutableTask);
+                }
+              });
+              events.add(
+                  PubsubEvent.TaskStateChange.transition(
+                      Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query)),
+                      stateMachine.getPreviousState()));
+              break;
+
+            case DELETE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+
+              events.add(deleteTasks(storeProvider, ImmutableSet.of(taskId)));
+              break;
+
+            case INCREMENT_FAILURES:
+              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+                @Override public IScheduledTask apply(IScheduledTask task) {
+                  return IScheduledTask.build(
+                      task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+                }
+              });
+              break;
+
+            case STATE_CHANGE:
+              updateTaskAndExternalState(
+                  Optional.<ScheduleStatus>absent(),
+                  taskId,
+                  sideEffect.getNextState().get(),
+                  Optional.<String>absent());
+              break;
+
+            default:
+              throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
+          }
         }
+
+        return result.isSuccess();
       }
+    });
+
+    // Note: Delaying events until after the write operation is somewhat futile, since the state
+    // may actually not be written to durable store (e.g. if this is a nested transaction).
+    // Ideally, Storage would add a facility to attach side-effects that are performed after the
+    // outer-most transaction completes (meaning state has been durably persisted).
+    for (PubsubEvent event : events) {
+      eventSink.post(event);
     }
+
+    return success;
   }
 
   @Override
   public void deleteTasks(final Set<String> taskIds) {
-    storage.write(storage.new NoResultQuietSideEffectWork() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(final MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
-        addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
-        taskStore.deleteTasks(taskIds);
+        eventSink.post(deleteTasks(storeProvider, taskIds));
       }
     });
   }
 
-  private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
-    if (task != null) {
-      return createStateMachine(task, task.getStatus());
-    }
-
-    // The task is unknown, not present in storage.
-    TaskStateMachine stateMachine = new TaskStateMachine(
-        taskId,
-        null,
-        workSink,
-        clock,
-        INIT);
-    stateMachine.updateState(UNKNOWN);
-    return stateMachine;
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task) {
-    return createStateMachine(task, INIT);
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
-    return new TaskStateMachine(
-        Tasks.id(task),
-        task,
-        workSink,
-        clock,
-        initialState);
+  private static PubsubEvent deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds) {
+    TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+    Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
+    taskStore.deleteTasks(taskIds);
+    return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index d0f88e5..cd0899c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -15,39 +15,54 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
+import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import com.twitter.common.base.Closure;
 import com.twitter.common.base.Closures;
 import com.twitter.common.base.Command;
 import com.twitter.common.base.MorePreconditions;
 import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
 import com.twitter.common.util.StateMachine;
 import com.twitter.common.util.StateMachine.Rule;
 import com.twitter.common.util.StateMachine.Transition;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.commons.lang.builder.HashCodeBuilder;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
+import static org.apache.aurora.scheduler.state.SideEffect.Action;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.DELETE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAILURES;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.STATE_CHANGE;
+
 /**
  * State machine for a task.
  * <p>
@@ -55,8 +70,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * to different state transitions.  These responses are externally communicated by populating a
  * provided work queue.
  * <p>
- * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
- *     abstractly from the consumption side.
+ * TODO(wfarner): Augment this class to force the one-time-use nature.  This is probably best done
+ * by hiding the constructor and exposing only a static function to transition a task and get the
+ * resulting actions.
  */
 class TaskStateMachine {
   private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
@@ -64,183 +80,94 @@ class TaskStateMachine {
   private static final AtomicLong ILLEGAL_TRANSITIONS =
       Stats.exportLong("scheduler_illegal_task_state_transitions");
 
-  // Re-declarations of statuses as wrapped state objects.
-  private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
-  private static final State FAILED = State.create(ScheduleStatus.FAILED);
-  private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
-  private static final State INIT = State.create(ScheduleStatus.INIT);
-  private static final State KILLED = State.create(ScheduleStatus.KILLED);
-  private static final State KILLING = State.create(ScheduleStatus.KILLING);
-  private static final State LOST = State.create(ScheduleStatus.LOST);
-  private static final State PENDING = State.create(ScheduleStatus.PENDING);
-  private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
-  private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
-  private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
-  private static final State STARTING = State.create(ScheduleStatus.STARTING);
-  private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
-
-  @VisibleForTesting
-  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
-      new Supplier<String>() {
-        @Override public String get() {
-          try {
-            return InetAddress.getLocalHost().getHostName();
-          } catch (UnknownHostException e) {
-            LOG.log(Level.SEVERE, "Failed to get self hostname.");
-            throw Throwables.propagate(e);
-          }
-        }
-      });
-
-  private final String taskId;
-  private final WorkSink workSink;
-  private final StateMachine<State> stateMachine;
+  private final StateMachine<ScheduleStatus> stateMachine;
   private ScheduleStatus previousState = null;
-  private final Clock clock;
-
-  /**
-   * Composes a schedule status and a state change argument.  Only the ScheduleStatuses in two
-   * States must be equal for them to be considered equal.
-   */
-  private static class State {
-    private final ScheduleStatus state;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
-      this.state = state;
-      this.mutation = mutation;
-    }
-
-    static State create(ScheduleStatus status) {
-      return create(status, Functions.<IScheduledTask>identity());
-    }
-
-    static State create(
-        ScheduleStatus status,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      return new State(status, mutation);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof State)) {
-        return false;
-      }
-
-      if (o == this) {
-        return true;
-      }
-
-      State other = (State) o;
-      return state == other.state;
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder()
-          .append(state)
-          .toHashCode();
-    }
 
-    @Override
-    public String toString() {
-      return state.toString();
-    }
-
-    private ScheduleStatus getState() {
-      return state;
-    }
-
-    private Function<IScheduledTask, IScheduledTask> getMutation() {
-      return mutation;
-    }
-  }
+  private final Set<SideEffect> sideEffects = Sets.newHashSet();
 
   /**
-   * A write-only work acceptor.
+   * Creates a new task state machine representing a non-existent task.  This allows for consistent
+   * state-reconciliation actions when the external system disagrees with the scheduler.
+   *
+   * @param name Name of the state machine, for logging.
    */
-  public interface WorkSink {
-    /**
-     * Appends external work that must be performed for a state machine transition to be fully
-     * complete.
-     *
-     * @param work Description of the work to be performed.
-     * @param stateMachine The state machine that the work is associated with.
-     * @param mutation Mutate operation to perform along with the state transition.
-     */
-    void addWork(
-        WorkCommand work,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation);
+  public TaskStateMachine(String name) {
+    this(name, Optional.<IScheduledTask>absent());
   }
 
   /**
-   * Creates a new task state machine.
-   *
-   * @param taskId ID of the task managed by this state machine.
+   * Creates a new task state machine representing an existent task.  The state machine will be
+   * named with the tasks ID.
+   *.
    * @param task Read-only task that this state machine manages.
-   * @param workSink Work sink to receive transition response actions
-   * @param clock Clock to use for reading the current time.
-   * @param initialState The state to begin the state machine at.  All legal transitions will be
-   *     added, but this allows the state machine to 'skip' states, for instance when a task is
-   *     loaded from a persistent store.
    */
-  public TaskStateMachine(
-      final String taskId,
-      final IScheduledTask task,
-      final WorkSink workSink,
-      final Clock clock,
-      final ScheduleStatus initialState) {
-
-    this.taskId = MorePreconditions.checkNotBlank(taskId);
-    this.workSink = checkNotNull(workSink);
-    this.clock = checkNotNull(clock);
-    checkNotNull(initialState);
-
-    @SuppressWarnings("unchecked")
-    Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
-        /* Kill a task that we believe to be terminated when an attempt is made to revive. */
-        Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
-            addWorkClosure(WorkCommand.KILL)),
-        /* Remove a terminated task that is remotely removed. */
-        Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
-
-    final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
-      @SuppressWarnings("fallthrough")
-      @Override public void execute(Transition<State> transition) {
-        switch (transition.getTo().getState()) {
-          case ASSIGNED:
-          case STARTING:
-          case RUNNING:
-            addWork(WorkCommand.KILL);
-            break;
-
-          case LOST:
-            addWork(WorkCommand.KILL);
-            // fall through
-
-          case FINISHED:
-          case FAILED:
-          case KILLED:
-            addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
-            break;
-
-          case UNKNOWN:
-            updateState(ScheduleStatus.LOST);
-            break;
-
-          default:
-            // No-op.
-        }
-      }
-    };
+  public TaskStateMachine(IScheduledTask task) {
+    this(Tasks.id(task), Optional.of(task));
+  }
+
+  private TaskStateMachine(final String name, final Optional<IScheduledTask> task) {
+    MorePreconditions.checkNotBlank(name);
+    checkNotNull(task);
+
+    final ScheduleStatus initialState = task.transform(Tasks.GET_STATUS).or(UNKNOWN);
+    Preconditions.checkState((initialState == UNKNOWN) == !task.isPresent());
+
+    Closure<Transition<ScheduleStatus>> manageTerminatedTasks = Closures.combine(
+        ImmutableList.<Closure<Transition<ScheduleStatus>>>builder()
+            // Kill a task that we believe to be terminated when an attempt is made to revive.
+            .add(Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING), addWorkClosure(KILL)))
+            // Remove a terminated task that is remotely removed.
+            .add(Closures.filter(Transition.to(UNKNOWN), addWorkClosure(DELETE)))
+            .build());
+
+    final Closure<Transition<ScheduleStatus>> manageRestartingTask =
+        new Closure<Transition<ScheduleStatus>>() {
+          @Override public void execute(Transition<ScheduleStatus> transition) {
+            switch (transition.getTo()) {
+              case ASSIGNED:
+                addFollowup(KILL);
+                break;
+
+              case STARTING:
+                addFollowup(KILL);
+                break;
+
+              case RUNNING:
+                addFollowup(KILL);
+                break;
+
+              case LOST:
+                addFollowup(KILL);
+                addFollowup(RESCHEDULE);
+                break;
+
+              case FINISHED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case FAILED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case KILLED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case UNKNOWN:
+                addFollowupTransition(LOST);
+                break;
+
+              default:
+                // No-op.
+            }
+          }
+        };
 
     // To be called on a task transitioning into the FINISHED state.
     final Command rescheduleIfService = new Command() {
       @Override public void execute() {
-        if (task.getAssignedTask().getTask().isIsService()) {
-          addWork(WorkCommand.RESCHEDULE);
+        if (task.get().getAssignedTask().getTask().isIsService()) {
+          addFollowup(RESCHEDULE);
         }
       }
     };
@@ -248,24 +175,26 @@ class TaskStateMachine {
     // To be called on a task transitioning into the FAILED state.
     final Command incrementFailuresMaybeReschedule = new Command() {
       @Override public void execute() {
-        addWork(WorkCommand.INCREMENT_FAILURES);
+        addFollowup(INCREMENT_FAILURES);
 
         // Max failures is ignored for service task.
-        boolean isService = task.getAssignedTask().getTask().isIsService();
+        boolean isService = task.get().getAssignedTask().getTask().isIsService();
 
         // Max failures is ignored when set to -1.
-        int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
-        if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
-          addWork(WorkCommand.RESCHEDULE);
+        int maxFailures = task.get().getAssignedTask().getTask().getMaxTaskFailures();
+        boolean belowMaxFailures =
+            (maxFailures == -1) || (task.get().getFailureCount() < (maxFailures - 1));
+        if (isService || belowMaxFailures) {
+          addFollowup(RESCHEDULE);
         } else {
-          LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
+          LOG.info("Task " + name + " reached failure limit, not rescheduling");
         }
       }
     };
 
-    stateMachine = StateMachine.<State>builder(taskId)
+    stateMachine = StateMachine.<ScheduleStatus>builder(name)
         .logTransitions()
-        .initialState(State.create(initialState))
+        .initialState(initialState)
         .addState(
             Rule.from(INIT)
                 .to(PENDING, UNKNOWN))
@@ -273,11 +202,11 @@ class TaskStateMachine {
             Rule.from(PENDING)
                 .to(ASSIGNED, KILLING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case KILLING:
-                            addWork(WorkCommand.DELETE);
+                            addFollowup(DELETE);
                             break;
 
                           default:
@@ -291,16 +220,15 @@ class TaskStateMachine {
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
                     KILLING, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -308,25 +236,24 @@ class TaskStateMachine {
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
-                            // fall through
-                          case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(RESCHEDULE);
+                            addFollowup(KILL);
                             break;
 
-                          case UNKNOWN:
+                          case KILLING:
+                            addFollowup(KILL);
                             break;
 
-                           default:
-                             // No-op.
+                          default:
+                            // No-op.
                         }
                       }
                     }
@@ -335,20 +262,19 @@ class TaskStateMachine {
             Rule.from(STARTING)
                 .to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -356,25 +282,25 @@ class TaskStateMachine {
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case UNKNOWN:
                             // The slave previously acknowledged that it had the task, and now
                             // stopped reporting it.
-                            updateState(ScheduleStatus.LOST);
+                            addFollowupTransition(LOST);
                             break;
 
-                           default:
-                             // No-op.
+                          default:
+                            // No-op.
                         }
                       }
                     }
@@ -383,20 +309,19 @@ class TaskStateMachine {
             Rule.from(RUNNING)
                 .to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -404,19 +329,19 @@ class TaskStateMachine {
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case UNKNOWN:
-                            updateState(ScheduleStatus.LOST);
+                            addFollowupTransition(LOST);
                             break;
 
                            default:
@@ -460,23 +385,25 @@ class TaskStateMachine {
         // Since we want this action to be performed last in the transition sequence, the callback
         // must be the last chained transition callback.
         .onAnyTransition(
-            new Closure<Transition<State>>() {
-              @Override public void execute(final Transition<State> transition) {
-                ScheduleStatus from = transition.getFrom().getState();
-                ScheduleStatus to = transition.getTo().getState();
-
-                if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
-                    // Prevent an update when killing a pending task, since the task is deleted
-                    // prior to the update.
-                    && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
-                  addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
-                } else if (!transition.isAllowed()) {
-                  LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
-                  ILLEGAL_TRANSITIONS.incrementAndGet();
-                }
-
+            new Closure<Transition<ScheduleStatus>>() {
+              @Override public void execute(final Transition<ScheduleStatus> transition) {
                 if (transition.isValidStateChange()) {
+                  ScheduleStatus from = transition.getFrom();
+                  ScheduleStatus to = transition.getTo();
+
+                  // TODO(wfarner): Clean up this hack.  This is here to suppress unnecessary work
+                  // (save followed by delete), but it shows a wart with this catch-all behavior.
+                  // Strongly consider pushing the SAVE_STATE behavior to each transition handler.
+                  boolean pendingDeleteHack = !((from == PENDING) && (to == KILLING));
+
+                  // Don't bother saving state of a task that is being removed.
+                  if ((to != UNKNOWN) && pendingDeleteHack) {
+                    addFollowup(SAVE_STATE);
+                  }
                   previousState = from;
+                } else {
+                  LOG.severe("Illegal state transition attempted: " + transition);
+                  ILLEGAL_TRANSITIONS.incrementAndGet();
                 }
               }
             }
@@ -490,56 +417,67 @@ class TaskStateMachine {
         .build();
   }
 
-  private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
-    return new Closure<Transition<State>>() {
-      @Override public void execute(Transition<State> item) {
-        addWork(work);
-      }
-    };
+  private void addFollowup(Action action) {
+    addFollowup(new SideEffect(action, Optional.<ScheduleStatus>absent()));
   }
 
-  private void addWork(WorkCommand work) {
-    addWork(work, Functions.<IScheduledTask>identity());
+  private void addFollowupTransition(ScheduleStatus status) {
+    addFollowup(new SideEffect(STATE_CHANGE, Optional.of(status)));
   }
 
-  private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
-    LOG.info("Adding work command " + work + " for " + this);
-    workSink.addWork(work, TaskStateMachine.this, mutation);
+  private void addFollowup(SideEffect action) {
+    LOG.info("Adding work command " + action + " for " + this);
+    sideEffects.add(action);
   }
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status) {
-    return updateState(status, Functions.<IScheduledTask>identity());
+  private Closure<Transition<ScheduleStatus>> addWorkClosure(final Action action) {
+    return new Closure<Transition<ScheduleStatus>>() {
+      @Override public void execute(Transition<ScheduleStatus> item) {
+        addFollowup(action);
+      }
+    };
   }
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @param auditMessage The (optional) audit message to associate with the transition.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
-    return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
-  }
+  public static class TransitionResult {
+    private final boolean success;
+    private final Set<SideEffect> sideEffects;
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
-   *
-   * @param status Status to apply to the task.
-   * @param mutation Mutate operation to perform while updating the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(
-      ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation) {
+    public TransitionResult(boolean success, Set<SideEffect> sideEffects) {
+      this.success = success;
+      this.sideEffects = Preconditions.checkNotNull(sideEffects);
+    }
+
+    public boolean isSuccess() {
+      return success;
+    }
+
+    public Set<SideEffect> getSideEffects() {
+      return sideEffects;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof TransitionResult)) {
+        return false;
+      }
+
+      TransitionResult other = (TransitionResult) o;
+      return (success == other.success)
+          && Objects.equal(sideEffects, other.sideEffects);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(success, sideEffects);
+    }
 
-    return updateState(status, mutation, Optional.<String>absent());
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+          .add("success", success)
+          .add("sideEffects", sideEffects)
+          .toString();
+    }
   }
 
   /**
@@ -548,59 +486,35 @@ class TaskStateMachine {
    * will be appended to the work queue.
    *
    * @param status Status to apply to the task.
-   * @param auditMessage The audit message to associate with the transition.
-   * @param mutation Mutate operation to perform while updating the task.
    * @return {@code true} if the state change was allowed, {@code false} otherwise.
    */
-  public synchronized boolean updateState(
-      final ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation,
-      final Optional<String> auditMessage) {
-
+  public synchronized TransitionResult updateState(final ScheduleStatus status) {
     checkNotNull(status);
-    checkNotNull(mutation);
-    checkNotNull(auditMessage);
+    Preconditions.checkState(sideEffects.isEmpty());
 
     /**
      * Don't bother applying noop state changes.  If we end up modifying task state without a
      * state transition (e.g. storing resource consumption of a running task), we need to find
      * a different way to suppress noop transitions.
      */
-    if (stateMachine.getState().getState() != status) {
-      Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
-          new Function<IScheduledTask, IScheduledTask>() {
-            @Override public IScheduledTask apply(IScheduledTask task) {
-              ScheduledTask builder = task.newBuilder();
-              builder.addToTaskEvents(new TaskEvent()
-                  .setTimestamp(clock.nowMillis())
-                  .setStatus(status)
-                  .setMessage(auditMessage.orNull())
-                  .setScheduler(LOCAL_HOST_SUPPLIER.get()));
-              return IScheduledTask.build(builder);
-            }
-          });
-      return stateMachine.transition(State.create(status, operation));
+    if (stateMachine.getState() == status) {
+      return new TransitionResult(false, ImmutableSet.<SideEffect>of());
     }
 
-    return false;
+    boolean success = stateMachine.transition(status);
+    Set<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects);
+    sideEffects.clear();
+    return new TransitionResult(success, transitionEffects);
   }
 
   /**
    * Fetch the current state from the state machine.
+   * TODO(wfarner): Consider removing, the caller should know this.
    *
    * @return The current state.
    */
   public synchronized ScheduleStatus getState() {
-    return stateMachine.getState().getState();
-  }
-
-  /**
-   * Gets the ID for the task that this state machine manages.
-   *
-   * @return The state machine's task ID.
-   */
-  public String getTaskId() {
-    return taskId;
+    return stateMachine.getState();
   }
 
   /**
@@ -616,6 +530,6 @@ class TaskStateMachine {
 
   @Override
   public String toString() {
-    return getTaskId();
+    return stateMachine.getName();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java b/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
deleted file mode 100644
index aff74d5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-/**
- * Descriptions of the different types of external work commands that task state machines may
- * trigger.
- */
-enum WorkCommand {
-  // Send an instruction for the runner of this task to kill the task.
-  KILL,
-  // Create a new state machine with a copy of this task.
-  RESCHEDULE,
-  // Update the task's state (schedule status) in the persistent store to match the state machine.
-  UPDATE_STATE,
-  // Delete this task from the persistent store.
-  DELETE,
-  // Increment the failure count for this task.
-  INCREMENT_FAILURES
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 79f5605..26468ce 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -200,6 +200,10 @@ public interface Storage {
 
   /**
    * Executes the unit of mutating {@code work}.
+   * TODO(wfarner): Add a mechanism by which mutating work can add side-effect operations to be
+   * performed after completion of the outer-most transaction.  As it stands, it's somewhat
+   * futile to try to achieve this within a transaction, since the local code does not know
+   * if the current transaction is nested.
    *
    * @param work The unit of work to execute.
    * @param <T> The type of result this unit of work produces.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 7fe297a..3d0ff2d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -71,6 +71,8 @@ public interface TaskStore {
     /**
      * Offers temporary mutable access to tasks.  If a task ID is not found, it will be silently
      * skipped, and no corresponding task will be returned.
+     * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case,
+     * and it prevents the caller from worrying about a bad query having broad impact.
      *
      * @param query Query to match tasks against.
      * @param mutator The mutate operation.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index b93e47f..af20e82 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -40,6 +41,7 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskRescheduled;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -50,14 +52,14 @@ import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IArgumentMatcher;
-import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
@@ -65,6 +67,7 @@ import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class StateManagerImplTest extends EasyMockTest {
@@ -90,11 +93,6 @@ public class StateManagerImplTest extends EasyMockTest {
     stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
   }
 
-  @After
-  public void validateCompletion() {
-    assertTrue(stateManager.getStorage().getEvents().isEmpty());
-  }
-
   private static class StateChangeMatcher implements IArgumentMatcher {
     private final String taskId;
     private final ScheduleStatus from;
@@ -171,7 +169,7 @@ public class StateManagerImplTest extends EasyMockTest {
         .setStatus(PENDING)
         .setTaskEvents(ImmutableList.of(new TaskEvent()
             .setTimestamp(clock.nowMillis())
-            .setScheduler(TaskStateMachine.LOCAL_HOST_SUPPLIER.get())
+            .setScheduler(StateManagerImpl.LOCAL_HOST_SUPPLIER.get())
             .setStatus(PENDING)))
         .setAssignedTask(new AssignedTask()
             .setInstanceId(3)
@@ -265,6 +263,87 @@ public class StateManagerImplTest extends EasyMockTest {
     changeState(unknownTask, RUNNING);
   }
 
+  @Test
+  public void testIncrementFailureCount() {
+    ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+    expectStateTransitions(taskId2, INIT, PENDING);
+    eventSink.post(new TaskRescheduled(task.getOwner().getRole(), task.getJobName(), 0));
+
+    control.replay();
+
+    insertTask(task, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, FAILED);
+    IScheduledTask rescheduledTask = Iterables.getOnlyElement(
+        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId2)));
+    assertEquals(1, rescheduledTask.getFailureCount());
+  }
+
+  @Test
+  public void testDoubleTransition() {
+    // Tests that a transition inducing another transition (STATE_CHANGE action) is performed.
+
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, LOST);
+
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+    expectStateTransitions(taskId2, INIT, PENDING);
+    eventSink.post(new TaskRescheduled(task.getOwner().getRole(), task.getJobName(), 0));
+
+    control.replay();
+
+    insertTask(task, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, UNKNOWN);
+  }
+
+  @Test
+  public void testCasTaskPresent() {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, FAILED);
+
+    control.replay();
+
+    insertTask(task, 0);
+    assignTask(taskId, HOST_A);
+    assertFalse(stateManager.changeState(
+        taskId,
+        Optional.of(PENDING),
+        RUNNING,
+        Optional.<String>absent()));
+    assertTrue(stateManager.changeState(
+        taskId,
+        Optional.of(ASSIGNED),
+        FAILED,
+        Optional.<String>absent()));
+  }
+
+  @Test
+  public void testCasTaskNotFound() {
+    control.replay();
+
+    assertFalse(stateManager.changeState(
+        "a",
+        Optional.of(PENDING),
+        ASSIGNED,
+        Optional.<String>absent()));
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,