You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/29 01:15:33 UTC

[2/2] git commit: Refactor StateManagerImpl and TaskStateMachine for less code and better readability.

Refactor StateManagerImpl and TaskStateMachine for less code and better
readability.

The "big picture" for this change is that the closures inside
TaskStateMachine no longer drop items onto a work queue that feeds back into
StateManagerImpl.  Instead, it returns these actions in a TransitionResult.
I intend to improve this further in the future by exposing only a helper
function in TaskStateMachine, to guarantee the one-time-use semantic.

Reviewed at https://reviews.apache.org/r/16873/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/3870df32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/3870df32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/3870df32

Branch: refs/heads/master
Commit: 3870df32bec25a2d0368b44c4760607942315ab3
Parents: e159880
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 28 16:09:49 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 28 16:09:49 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/state/SideEffect.java      | 106 ++++
 .../scheduler/state/SideEffectStorage.java      | 169 ------
 .../scheduler/state/StateManagerImpl.java       | 537 ++++++++++---------
 .../scheduler/state/TaskStateMachine.java       | 520 +++++++-----------
 .../scheduler/state/TransitionResult.java       |  73 +++
 .../aurora/scheduler/state/WorkCommand.java     |  33 --
 .../aurora/scheduler/storage/Storage.java       |   4 +
 .../aurora/scheduler/storage/TaskStore.java     |   2 +
 .../scheduler/state/StateManagerImplTest.java   |  97 +++-
 .../scheduler/state/TaskStateMachineTest.java   | 512 ++++++++++++------
 10 files changed, 1084 insertions(+), 969 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
new file mode 100644
index 0000000..04e6c7e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+/**
+ * Descriptions of the different types of external work commands that task state machines may
+ * trigger.
+ */
+class SideEffect {
+  private final Action action;
+  private final Optional<ScheduleStatus> nextState;
+
+  SideEffect(Action action, Optional<ScheduleStatus> nextState) {
+    this.action = action;
+    if (action == Action.STATE_CHANGE) {
+      Preconditions.checkArgument(
+          nextState.isPresent(),
+          "A next state must be provided for a state change action.");
+    }
+    this.nextState = nextState;
+  }
+
+  public Action getAction() {
+    return action;
+  }
+
+  public Optional<ScheduleStatus> getNextState() {
+    return nextState;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof SideEffect)) {
+      return false;
+    }
+
+    SideEffect other = (SideEffect) o;
+    return Objects.equal(action, other.action)
+        && Objects.equal(nextState, other.nextState);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(action, nextState);
+  }
+
+  @Override
+  public String toString() {
+    if (nextState.isPresent()) {
+      return action.toString() + " " + nextState.get();
+    } else {
+      return action.toString();
+    }
+  }
+
+  enum Action {
+    /**
+     * Send an instruction for the runner of this task to kill the task.
+     */
+    KILL,
+
+    /**
+     * Create a new state machine with a copy of this task.
+     */
+    RESCHEDULE,
+
+    /**
+     * Update the task's state (schedule status) in the persistent store to match the state machine.
+     */
+    SAVE_STATE,
+
+    /**
+     * Delete this task from the persistent store.
+     */
+    DELETE,
+
+    /**
+     * Increment the failure count for this task.
+     */
+    INCREMENT_FAILURES,
+
+    /**
+     * Perform an additional state change on the task.
+     */
+    STATE_CHANGE
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
deleted file mode 100644
index 2bdd459..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around the persistent storage and mutable state.
- */
-class SideEffectStorage {
-
-  private final Queue<PubsubEvent> events = Lists.newLinkedList();
-  @VisibleForTesting
-  Queue<PubsubEvent> getEvents() {
-    return events;
-  }
-
-  private AtomicBoolean inOperation = new AtomicBoolean(false);
-
-  private final Storage storage;
-  private final OperationFinalizer operationFinalizer;
-  private final EventSink eventSink;
-
-  interface OperationFinalizer {
-    /**
-     * Performs any work necessary to complete the operation.
-     * This is executed in the context of a write operation, immediately after the work
-     * executes normally.
-     * NOTE: At present, this is executed for every nesting level of operations, rather than
-     * at the completion of the top-level operation.
-     * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
-     * for more detail.
-     *
-     * @param work Work to finalize.
-     * @param storeProvider Mutable store reference.
-     */
-    void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
-  }
-
-  SideEffectStorage(
-      Storage storage,
-      OperationFinalizer operationFinalizer,
-      EventSink eventSink) {
-
-    this.storage = checkNotNull(storage);
-    this.operationFinalizer = checkNotNull(operationFinalizer);
-    this.eventSink = checkNotNull(eventSink);
-  }
-
-  /**
-   * Perform a unit of work in a mutating operation.  This supports nesting/reentrancy.
-   *
-   * @param work Work to perform.
-   * @param <T> Work return type
-   * @param <E> Work exception type.
-   * @return The work return value.
-   * @throws E The work exception.
-   */
-  <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
-    return storage.write(executeSideEffectsAfter(work));
-  }
-
-  <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    return storage.consistentRead(work);
-  }
-
-  /**
-   * Work that has side effects external to the storage system.
-   * Work may add side effect and pubsub events, which will be executed/sent upon normal
-   * completion of the operation.
-   *
-   * @param <T> Work return type.
-   * @param <E> Work exception type.
-   */
-  abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
-    protected final void addTaskEvent(PubsubEvent notice) {
-      Preconditions.checkState(inOperation.get());
-      events.add(Preconditions.checkNotNull(notice));
-    }
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions.
-   *
-   * @param <T>   Work return type.
-   */
-  abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
-  }
-
-  /**
-   * Work with side effects that does not have a return value.
-   *
-   * @param <E> Work exception type.
-   */
-  abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
-
-    @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
-      execute(storeProvider);
-      return null;
-    }
-
-    abstract void execute(MutableStoreProvider storeProvider) throws E;
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions or have a return
-   * value.
-   */
-  abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
-  }
-
-  private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
-      final SideEffectWork<T, E> work) {
-
-    return new MutateWork<T, E>() {
-      @Override public T apply(MutableStoreProvider storeProvider) throws E {
-        boolean topLevelOperation = inOperation.compareAndSet(false, true);
-
-        try {
-          T result = work.apply(storeProvider);
-
-          // TODO(William Farner): Maintaining this since it matches prior behavior, but this
-          // seems wrong.  Double-check whether this is necessary, or if only the top-level
-          // operation should be executing the finalizer.  Update doc on OperationFinalizer
-          // once this is assessed.
-          operationFinalizer.finalize(work, storeProvider);
-          if (topLevelOperation) {
-            while (!events.isEmpty()) {
-              eventSink.post(events.remove());
-            }
-          }
-          return result;
-        } finally {
-          if (topLevelOperation) {
-            inOperation.set(false);
-          }
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 2b8ca09..6fee43c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -15,33 +15,39 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Comparator;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
@@ -49,9 +55,10 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -60,96 +67,27 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Iterables.transform;
 import static com.twitter.common.base.MorePreconditions.checkNotBlank;
 
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
-
 
 /**
  * Manager of all persistence-related operations for the scheduler.  Acts as a controller for
  * persisted state machine transitions, and their side-effects.
- *
- * TODO(wfarner): This class is due for an overhaul.  There are several aspects of it that could
- * probably be made much simpler.  Specifically, the workQueue is particularly difficult to reason
- * about.
  */
 public class StateManagerImpl implements StateManager {
   private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
 
-  @VisibleForTesting
-  SideEffectStorage getStorage() {
-    return storage;
-  }
-
-  // Work queue to receive state machine side effect work.
-  // Items are sorted to place DELETE entries last.  This is to ensure that within an operation,
-  // a delete is always processed after a state transition.
-  private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
-      new Comparator<WorkEntry>() {
-        @Override public int compare(WorkEntry a, WorkEntry b) {
-          if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
-            return (a.command == WorkCommand.DELETE) ? 1 : -1;
-          } else {
-            return 0;
-          }
-        }
-      });
-
-  // Adapt the work queue into a sink.
-  private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
-      @Override public void addWork(
-          WorkCommand work,
-          TaskStateMachine stateMachine,
-          Function<IScheduledTask, IScheduledTask> mutation) {
-
-        workQueue.add(new WorkEntry(work, stateMachine, mutation));
-      }
-    };
-
-  private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
-      new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
-        @Override public IScheduledTask apply(Map.Entry<Integer, ITaskConfig> entry) {
-          ITaskConfig task = entry.getValue();
-          AssignedTask assigned = new AssignedTask()
-              .setTaskId(taskIdGenerator.generate(task, entry.getKey()))
-              .setInstanceId(entry.getKey())
-              .setTask(task.newBuilder());
-          return IScheduledTask.build(new ScheduledTask()
-              .setStatus(INIT)
-              .setAssignedTask(assigned));
-        }
-      };
-
-  private final SideEffectStorage storage;
+  private final Storage storage;
   private final Clock clock;
   private final Driver driver;
   private final TaskIdGenerator taskIdGenerator;
+  private final EventSink eventSink;
   private final RescheduleCalculator rescheduleCalculator;
 
-  /**
-   * An item of work on the work queue.
-   */
-  private static class WorkEntry {
-    private final WorkCommand command;
-    private final TaskStateMachine stateMachine;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    WorkEntry(
-        WorkCommand command,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      this.command = command;
-      this.stateMachine = stateMachine;
-      this.mutation = mutation;
-    }
-  }
-
   @Inject
   StateManagerImpl(
       final Storage storage,
@@ -159,21 +97,22 @@ public class StateManagerImpl implements StateManager {
       EventSink eventSink,
       RescheduleCalculator rescheduleCalculator) {
 
-    checkNotNull(storage);
+    this.storage = checkNotNull(storage);
     this.clock = checkNotNull(clock);
-
-    OperationFinalizer finalizer = new OperationFinalizer() {
-      @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
-        processWorkQueueInWriteOperation(work, store);
-      }
-    };
-
-    this.storage = new SideEffectStorage(storage, finalizer, eventSink);
     this.driver = checkNotNull(driver);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
+    this.eventSink = checkNotNull(eventSink);
     this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
+  }
 
-    Stats.exportSize("work_queue_depth", workQueue);
+  private IScheduledTask createTask(int instanceId, ITaskConfig template) {
+    AssignedTask assigned = new AssignedTask()
+        .setTaskId(taskIdGenerator.generate(template, instanceId))
+        .setInstanceId(instanceId)
+        .setTask(template.newBuilder());
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(INIT)
+        .setAssignedTask(assigned));
   }
 
   @Override
@@ -181,15 +120,23 @@ public class StateManagerImpl implements StateManager {
     checkNotNull(tasks);
 
     // Done outside the write transaction to minimize the work done inside a transaction.
-    final Set<IScheduledTask> scheduledTasks =
-        ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
+    final Set<IScheduledTask> scheduledTasks = FluentIterable.from(tasks.entrySet())
+        .transform(new Function<Entry<Integer, ITaskConfig>, IScheduledTask>() {
+          @Override public IScheduledTask apply(Entry<Integer, ITaskConfig> entry) {
+            return createTask(entry.getKey(), entry.getValue());
+          }
+        }).toSet();
 
-    storage.write(storage.new NoResultQuietSideEffectWork() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(MutableStoreProvider storeProvider) {
         storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
 
         for (IScheduledTask task : scheduledTasks) {
-          createStateMachine(task).updateState(PENDING);
+          updateTaskAndExternalState(
+              Tasks.id(task),
+              Optional.of(task),
+              PENDING,
+              Optional.<String>absent());
         }
       }
     });
@@ -202,54 +149,55 @@ public class StateManagerImpl implements StateManager {
       final ScheduleStatus newState,
       final Optional<String> auditMessage) {
 
-    return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
-      @Override
-      public Boolean apply(TaskStateMachine stateMachine) {
-        return stateMachine.updateState(newState, auditMessage);
-      }
-    });
+    return updateTaskAndExternalState(casState, taskId, newState, auditMessage);
   }
 
   @Override
   public IAssignedTask assignTask(
-      String taskId,
-      String slaveHost,
-      SlaveID slaveId,
-      Set<Integer> assignedPorts) {
+      final String taskId,
+      final String slaveHost,
+      final SlaveID slaveId,
+      final Set<Integer> assignedPorts) {
 
     checkNotBlank(taskId);
     checkNotBlank(slaveHost);
+    checkNotNull(slaveId);
     checkNotNull(assignedPorts);
 
-    TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
-    changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
-
-    return mutation.getAssignedTask();
-  }
-
-  private boolean changeState(
-      final String taskId,
-      final Optional<ScheduleStatus> casState,
-      final Function<TaskStateMachine, Boolean> stateChange) {
-
-    return storage.write(storage.new QuietSideEffectWork<Boolean>() {
-      @Override public Boolean apply(MutableStoreProvider storeProvider) {
-        IScheduledTask task = Iterables.getOnlyElement(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
-            null);
-        if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
-          return false;
-        }
+    return storage.write(new MutateWork.Quiet<IAssignedTask>() {
+      @Override public IAssignedTask apply(MutableStoreProvider storeProvider) {
+        boolean success = updateTaskAndExternalState(
+            Optional.<ScheduleStatus>absent(),
+            taskId,
+            ASSIGNED,
+            Optional.<String>absent());
+
+        Preconditions.checkState(
+            success,
+            "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
+        Query.Builder query = Query.taskScoped(taskId);
+        storeProvider.getUnsafeTaskStore().mutateTasks(query,
+            new Function<IScheduledTask, IScheduledTask>() {
+              @Override
+              public IScheduledTask apply(IScheduledTask task) {
+                ScheduledTask builder = task.newBuilder();
+                AssignedTask assigned = builder.getAssignedTask();
+                assigned.setAssignedPorts(
+                    getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+                assigned.setSlaveHost(slaveHost)
+                    .setSlaveId(slaveId.getValue());
+                return IScheduledTask.build(builder);
+              }
+            });
 
-        return stateChange.apply(getStateMachine(taskId, task));
+        return Iterables.getOnlyElement(
+            Iterables.transform(
+                storeProvider.getTaskStore().fetchTasks(query),
+                Tasks.SCHEDULED_TO_ASSIGNED));
       }
     });
   }
 
-  private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
-    IAssignedTask getAssignedTask();
-  }
-
   private static Map<String, Integer> getNameMappedPorts(
       Set<String> portNames,
       Set<Integer> allocatedPorts) {
@@ -275,164 +223,219 @@ public class StateManagerImpl implements StateManager {
     return ports;
   }
 
-  private TaskAssignMutation assignHost(
-      final String slaveHost,
-      final SlaveID slaveId,
-      final Set<Integer> assignedPorts) {
+  @VisibleForTesting
+  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
+      new Supplier<String>() {
+        @Override public String get() {
+          try {
+            return InetAddress.getLocalHost().getHostName();
+          } catch (UnknownHostException e) {
+            LOG.log(Level.SEVERE, "Failed to get self hostname.");
+            throw Throwables.propagate(e);
+          }
+        }
+      });
 
-    final TaskMutation mutation = new TaskMutation() {
-      @Override public IScheduledTask apply(IScheduledTask task) {
-        ScheduledTask builder = task.newBuilder();
-        AssignedTask assigned = builder.getAssignedTask();
-        assigned.setAssignedPorts(
-            getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
-        assigned.setSlaveHost(slaveHost)
-            .setSlaveId(slaveId.getValue());
-        return IScheduledTask.build(builder);
-      }
-    };
+  private boolean updateTaskAndExternalState(
+      final Optional<ScheduleStatus> casState,
+      final String taskId,
+      final ScheduleStatus targetState,
+      final Optional<String> transitionMessage) {
 
-    return new TaskAssignMutation() {
-      private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
-      @Override public IAssignedTask getAssignedTask() {
-        return assignedTask.get();
-      }
+    return storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
+            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+            null));
 
-      @Override public Boolean apply(final TaskStateMachine stateMachine) {
-        TaskMutation wrapper = new TaskMutation() {
-          @Override public IScheduledTask apply(IScheduledTask task) {
-            IScheduledTask mutated = mutation.apply(task);
-            Preconditions.checkState(
-                assignedTask.compareAndSet(null, mutated.getAssignedTask()),
-                "More than one result was found for an identity query.");
-            return mutated;
-          }
-        };
-        return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
+        // CAS operation fails if the task does not exist, or the states don't match.
+        if (casState.isPresent()
+            && (!task.isPresent() || (casState.get() != task.get().getStatus()))) {
+
+          return false;
+        }
+
+        return updateTaskAndExternalState(taskId, task, targetState, transitionMessage);
       }
-    };
+    });
   }
 
-  private void processWorkQueueInWriteOperation(
-      SideEffectWork<?, ?> sideEffectWork,
-      MutableStoreProvider storeProvider) {
-
-    for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
-      final TaskStateMachine stateMachine = work.stateMachine;
-
-      if (work.command == WorkCommand.KILL) {
-        driver.killTask(stateMachine.getTaskId());
-      } else {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        String taskId = stateMachine.getTaskId();
-        Query.Builder idQuery = Query.taskScoped(taskId);
-
-        switch (work.command) {
-          case RESCHEDULE:
-            IScheduledTask ancestor = Iterables.getOnlyElement(taskStore.fetchTasks(idQuery));
-
-            ScheduledTask builder = ancestor.newBuilder();
-            builder.getAssignedTask().unsetSlaveId();
-            builder.getAssignedTask().unsetSlaveHost();
-            builder.getAssignedTask().unsetAssignedPorts();
-            builder.unsetTaskEvents();
-            builder.setAncestorId(taskId);
-            String newTaskId = taskIdGenerator.generate(
-                ITaskConfig.build(builder.getAssignedTask().getTask()),
-                builder.getAssignedTask().getInstanceId());
-            builder.getAssignedTask().setTaskId(newTaskId);
-
-            LOG.info("Task being rescheduled: " + taskId);
-
-            IScheduledTask task = IScheduledTask.build(builder);
-            taskStore.saveTasks(ImmutableSet.of(task));
-
-            ScheduleStatus newState;
-            String auditMessage;
-            long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(ancestor);
-            if (flapPenaltyMs > 0) {
-              newState = THROTTLED;
-              auditMessage =
-                  String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
-            } else {
-              newState = PENDING;
-              auditMessage = "Rescheduled";
-            }
-
-            createStateMachine(task).updateState(newState, Optional.of(auditMessage));
-            break;
-
-          case UPDATE_STATE:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return work.mutation.apply(
-                    IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
-              }
-            });
-            sideEffectWork.addTaskEvent(
-                PubsubEvent.TaskStateChange.transition(
-                    Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
-                    stateMachine.getPreviousState()));
-            break;
-
-          case DELETE:
-            deleteTasks(ImmutableSet.of(taskId));
-            break;
-
-          case INCREMENT_FAILURES:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return IScheduledTask.build(
-                    task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+  private static final Function<SideEffect, Action> GET_ACTION =
+      new Function<SideEffect, Action>() {
+        @Override public Action apply(SideEffect sideEffect) {
+          return sideEffect.getAction();
+        }
+      };
+
+  private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
+      Action.INCREMENT_FAILURES,
+      Action.SAVE_STATE,
+      Action.STATE_CHANGE,
+      Action.RESCHEDULE,
+      Action.KILL,
+      Action.DELETE);
+
+  static {
+    // Sanity check to ensure no actions are missing.
+    Preconditions.checkState(
+        ImmutableSet.copyOf(ACTIONS_IN_ORDER).equals(ImmutableSet.copyOf(Action.values())),
+        "Not all actions are included in ordering.");
+  }
+
+  // Actions are deliberately ordered to prevent things like deleting a task before rescheduling it
+  // (thus losing the object to copy), or rescheduling a task before incrementing the failure count
+  // (thus not carrying forward the failure increment).
+  private static final Ordering<SideEffect> ACTION_ORDER =
+      Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
+
+  private boolean updateTaskAndExternalState(
+      final String taskId,
+      // Note: This argument is deliberately non-final, and should not be made final.
+      // This is because using the captured value within the storage operation below is
+      // highly-risky, since it doesn't necessarily represent the value in storage.
+      // As a result, it would be easy to accidentally clobber mutations.
+      Optional<IScheduledTask> task,
+      final ScheduleStatus targetState,
+      final Optional<String> transitionMessage) {
+
+    if (task.isPresent()) {
+      Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId()));
+    }
+
+    final List<PubsubEvent> events = Lists.newArrayList();
+
+    final TaskStateMachine stateMachine = task.isPresent()
+        ? new TaskStateMachine(task.get())
+        : new TaskStateMachine(taskId);
+
+    boolean success = storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        TransitionResult result = stateMachine.updateState(targetState);
+        Query.Builder query = Query.taskScoped(taskId);
+
+        for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
+          Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
+              Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query), null));
+
+          switch (sideEffect.getAction()) {
+            case INCREMENT_FAILURES:
+              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+                @Override public IScheduledTask apply(IScheduledTask task) {
+                  return IScheduledTask.build(
+                      task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+                }
+              });
+              break;
+
+            case SAVE_STATE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+
+              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+                @Override public IScheduledTask apply(IScheduledTask task) {
+                  ScheduledTask mutableTask = task.newBuilder();
+                  mutableTask.setStatus(stateMachine.getState());
+                  mutableTask.addToTaskEvents(new TaskEvent()
+                      .setTimestamp(clock.nowMillis())
+                      .setStatus(targetState)
+                      .setMessage(transitionMessage.orNull())
+                      .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+                  return IScheduledTask.build(mutableTask);
+                }
+              });
+              events.add(
+                  PubsubEvent.TaskStateChange.transition(
+                      Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query)),
+                      stateMachine.getPreviousState()));
+              break;
+
+            case STATE_CHANGE:
+              updateTaskAndExternalState(
+                  Optional.<ScheduleStatus>absent(),
+                  taskId,
+                  sideEffect.getNextState().get(),
+                  Optional.<String>absent());
+              break;
+
+            case RESCHEDULE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+              LOG.info("Task being rescheduled: " + taskId);
+
+              ScheduleStatus newState;
+              String auditMessage;
+              long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(upToDateTask.get());
+              if (flapPenaltyMs > 0) {
+                newState = THROTTLED;
+                auditMessage =
+                    String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
+              } else {
+                newState = PENDING;
+                auditMessage = "Rescheduled";
               }
-            });
-            break;
 
-          default:
-            LOG.severe("Unrecognized work command type " + work.command);
+              IScheduledTask newTask = IScheduledTask.build(createTask(
+                  upToDateTask.get().getAssignedTask().getInstanceId(),
+                  upToDateTask.get().getAssignedTask().getTask())
+                  .newBuilder()
+                  .setFailureCount(upToDateTask.get().getFailureCount())
+                  .setAncestorId(taskId));
+              storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(newTask));
+              updateTaskAndExternalState(
+                  Tasks.id(newTask),
+                  Optional.of(newTask),
+                  newState,
+                  Optional.of(auditMessage));
+              break;
+
+            case KILL:
+              driver.killTask(taskId);
+              break;
+
+            case DELETE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+
+              events.add(deleteTasks(storeProvider, ImmutableSet.of(taskId)));
+              break;
+
+            default:
+              throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
+          }
         }
+
+        return result.isSuccess();
       }
+    });
+
+    // Note (AURORA-138): Delaying events until after the write operation is somewhat futile, since
+    // the state may actually not be written to durable store
+    // (e.g. if this is a nested transaction). Ideally, Storage would add a facility to attach
+    // side-effects that are performed after the outer-most transaction completes (meaning state
+    // has been durably persisted).
+    for (PubsubEvent event : events) {
+      eventSink.post(event);
     }
+
+    return success;
   }
 
   @Override
   public void deleteTasks(final Set<String> taskIds) {
-    storage.write(storage.new NoResultQuietSideEffectWork() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(final MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
-        addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
-        taskStore.deleteTasks(taskIds);
+        eventSink.post(deleteTasks(storeProvider, taskIds));
       }
     });
   }
 
-  private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
-    if (task != null) {
-      return createStateMachine(task, task.getStatus());
-    }
-
-    // The task is unknown, not present in storage.
-    TaskStateMachine stateMachine = new TaskStateMachine(
-        taskId,
-        null,
-        workSink,
-        clock,
-        INIT);
-    stateMachine.updateState(UNKNOWN);
-    return stateMachine;
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task) {
-    return createStateMachine(task, INIT);
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
-    return new TaskStateMachine(
-        Tasks.id(task),
-        task,
-        workSink,
-        clock,
-        initialState);
+  private static PubsubEvent deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds) {
+    TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+    Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
+    taskStore.deleteTasks(taskIds);
+    return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index 11d283d..ebccc74 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -15,39 +15,54 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import com.twitter.common.base.Closure;
 import com.twitter.common.base.Closures;
 import com.twitter.common.base.Command;
 import com.twitter.common.base.MorePreconditions;
 import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
 import com.twitter.common.util.StateMachine;
 import com.twitter.common.util.StateMachine.Rule;
 import com.twitter.common.util.StateMachine.Transition;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.commons.lang.builder.HashCodeBuilder;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
+import static org.apache.aurora.scheduler.state.SideEffect.Action;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.DELETE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAILURES;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.STATE_CHANGE;
+
 /**
  * State machine for a task.
  * <p>
@@ -55,8 +70,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * to different state transitions.  These responses are externally communicated by populating a
  * provided work queue.
  * <p>
- * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
- *     abstractly from the consumption side.
+ * TODO(wfarner): Augment this class to force the one-time-use nature.  This is probably best done
+ * by hiding the constructor and exposing only a static function to transition a task and get the
+ * resulting actions.
  */
 class TaskStateMachine {
   private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
@@ -64,184 +80,104 @@ class TaskStateMachine {
   private static final AtomicLong ILLEGAL_TRANSITIONS =
       Stats.exportLong("scheduler_illegal_task_state_transitions");
 
-  // Re-declarations of statuses as wrapped state objects.
-  private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
-  private static final State FAILED = State.create(ScheduleStatus.FAILED);
-  private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
-  private static final State INIT = State.create(ScheduleStatus.INIT);
-  private static final State KILLED = State.create(ScheduleStatus.KILLED);
-  private static final State KILLING = State.create(ScheduleStatus.KILLING);
-  private static final State LOST = State.create(ScheduleStatus.LOST);
-  private static final State PENDING = State.create(ScheduleStatus.PENDING);
-  private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
-  private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
-  private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
-  private static final State STARTING = State.create(ScheduleStatus.STARTING);
-  private static final State THROTTLED = State.create(ScheduleStatus.THROTTLED);
-  private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
-
-  @VisibleForTesting
-  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
-      new Supplier<String>() {
-        @Override public String get() {
-          try {
-            return InetAddress.getLocalHost().getHostName();
-          } catch (UnknownHostException e) {
-            LOG.log(Level.SEVERE, "Failed to get self hostname.");
-            throw Throwables.propagate(e);
-          }
-        }
-      });
-
-  private final String taskId;
-  private final WorkSink workSink;
-  private final StateMachine<State> stateMachine;
+  private final StateMachine<ScheduleStatus> stateMachine;
   private ScheduleStatus previousState = null;
-  private final Clock clock;
-
-  /**
-   * Composes a schedule status and a state change argument.  Only the ScheduleStatuses in two
-   * States must be equal for them to be considered equal.
-   */
-  private static class State {
-    private final ScheduleStatus state;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
-      this.state = state;
-      this.mutation = mutation;
-    }
-
-    static State create(ScheduleStatus status) {
-      return create(status, Functions.<IScheduledTask>identity());
-    }
-
-    static State create(
-        ScheduleStatus status,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      return new State(status, mutation);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof State)) {
-        return false;
-      }
-
-      if (o == this) {
-        return true;
-      }
-
-      State other = (State) o;
-      return state == other.state;
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder()
-          .append(state)
-          .toHashCode();
-    }
-
-    @Override
-    public String toString() {
-      return state.toString();
-    }
 
-    private ScheduleStatus getState() {
-      return state;
-    }
-
-    private Function<IScheduledTask, IScheduledTask> getMutation() {
-      return mutation;
-    }
-  }
+  private final Set<SideEffect> sideEffects = Sets.newHashSet();
 
   /**
-   * A write-only work acceptor.
+   * Creates a new task state machine representing a non-existent task.  This allows for consistent
+   * state-reconciliation actions when the external system disagrees with the scheduler.
+   *
+   * @param name Name of the state machine, for logging.
    */
-  public interface WorkSink {
-    /**
-     * Appends external work that must be performed for a state machine transition to be fully
-     * complete.
-     *
-     * @param work Description of the work to be performed.
-     * @param stateMachine The state machine that the work is associated with.
-     * @param mutation Mutate operation to perform along with the state transition.
-     */
-    void addWork(
-        WorkCommand work,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation);
+  public TaskStateMachine(String name) {
+    this(name, Optional.<IScheduledTask>absent());
   }
 
   /**
-   * Creates a new task state machine.
-   *
-   * @param taskId ID of the task managed by this state machine.
+   * Creates a new task state machine representing an existent task.  The state machine will be
+   * named with the tasks ID.
+   *.
    * @param task Read-only task that this state machine manages.
-   * @param workSink Work sink to receive transition response actions
-   * @param clock Clock to use for reading the current time.
-   * @param initialState The state to begin the state machine at.  All legal transitions will be
-   *     added, but this allows the state machine to 'skip' states, for instance when a task is
-   *     loaded from a persistent store.
    */
-  public TaskStateMachine(
-      final String taskId,
-      final IScheduledTask task,
-      final WorkSink workSink,
-      final Clock clock,
-      final ScheduleStatus initialState) {
-
-    this.taskId = MorePreconditions.checkNotBlank(taskId);
-    this.workSink = checkNotNull(workSink);
-    this.clock = checkNotNull(clock);
-    checkNotNull(initialState);
-
-    @SuppressWarnings("unchecked")
-    Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
-        /* Kill a task that we believe to be terminated when an attempt is made to revive. */
-        Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
-            addWorkClosure(WorkCommand.KILL)),
-        /* Remove a terminated task that is remotely removed. */
-        Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
-
-    final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
-      @SuppressWarnings("fallthrough")
-      @Override public void execute(Transition<State> transition) {
-        switch (transition.getTo().getState()) {
-          case ASSIGNED:
-          case STARTING:
-          case RUNNING:
-            addWork(WorkCommand.KILL);
-            break;
-
-          case LOST:
-            addWork(WorkCommand.KILL);
-            // fall through
-
-          case FINISHED:
-          case FAILED:
-          case KILLED:
-            addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
-            break;
-
-          case UNKNOWN:
-            updateState(ScheduleStatus.LOST);
-            break;
-
-          default:
-            // No-op.
-        }
-      }
-    };
+  public TaskStateMachine(IScheduledTask task) {
+    this(Tasks.id(task), Optional.of(task));
+  }
+
+  private TaskStateMachine(final String name, final Optional<IScheduledTask> task) {
+    MorePreconditions.checkNotBlank(name);
+    checkNotNull(task);
+
+    final ScheduleStatus initialState = task.transform(Tasks.GET_STATUS).or(UNKNOWN);
+    if (task.isPresent()) {
+      Preconditions.checkState(
+          initialState != UNKNOWN,
+          "A task that exists may not be in UNKNOWN state.");
+    } else {
+      Preconditions.checkState(
+          initialState == UNKNOWN,
+          "A task that does not exist must start un UNKNOWN state.");
+    }
+
+    Closure<Transition<ScheduleStatus>> manageTerminatedTasks = Closures.combine(
+        ImmutableList.<Closure<Transition<ScheduleStatus>>>builder()
+            // Kill a task that we believe to be terminated when an attempt is made to revive.
+            .add(
+                Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
+                addFollowupClosure(KILL)))
+            // Remove a terminated task that is remotely removed.
+            .add(Closures.filter(Transition.to(UNKNOWN), addFollowupClosure(DELETE)))
+            .build());
+
+    final Closure<Transition<ScheduleStatus>> manageRestartingTask =
+        new Closure<Transition<ScheduleStatus>>() {
+          @Override public void execute(Transition<ScheduleStatus> transition) {
+            switch (transition.getTo()) {
+              case ASSIGNED:
+                addFollowup(KILL);
+                break;
+
+              case STARTING:
+                addFollowup(KILL);
+                break;
+
+              case RUNNING:
+                addFollowup(KILL);
+                break;
+
+              case LOST:
+                addFollowup(KILL);
+                addFollowup(RESCHEDULE);
+                break;
+
+              case FINISHED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case FAILED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case KILLED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case UNKNOWN:
+                addFollowupTransition(LOST);
+                break;
+
+              default:
+                // No-op.
+            }
+          }
+        };
 
     // To be called on a task transitioning into the FINISHED state.
     final Command rescheduleIfService = new Command() {
       @Override public void execute() {
-        if (task.getAssignedTask().getTask().isIsService()) {
-          addWork(WorkCommand.RESCHEDULE);
+        if (task.get().getAssignedTask().getTask().isIsService()) {
+          addFollowup(RESCHEDULE);
         }
       }
     };
@@ -249,27 +185,29 @@ class TaskStateMachine {
     // To be called on a task transitioning into the FAILED state.
     final Command incrementFailuresMaybeReschedule = new Command() {
       @Override public void execute() {
-        addWork(WorkCommand.INCREMENT_FAILURES);
+        addFollowup(INCREMENT_FAILURES);
 
         // Max failures is ignored for service task.
-        boolean isService = task.getAssignedTask().getTask().isIsService();
+        boolean isService = task.get().getAssignedTask().getTask().isIsService();
 
         // Max failures is ignored when set to -1.
-        int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
-        if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
-          addWork(WorkCommand.RESCHEDULE);
+        int maxFailures = task.get().getAssignedTask().getTask().getMaxTaskFailures();
+        boolean belowMaxFailures =
+            (maxFailures == -1) || (task.get().getFailureCount() < (maxFailures - 1));
+        if (isService || belowMaxFailures) {
+          addFollowup(RESCHEDULE);
         } else {
-          LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
+          LOG.info("Task " + name + " reached failure limit, not rescheduling");
         }
       }
     };
 
-    final Closure<Transition<State>> deleteIfKilling =
-        Closures.filter(Transition.to(KILLING), addWorkClosure(WorkCommand.DELETE));
+    final Closure<Transition<ScheduleStatus>> deleteIfKilling =
+        Closures.filter(Transition.to(KILLING), addFollowupClosure(DELETE));
 
-    stateMachine = StateMachine.<State>builder(taskId)
+    stateMachine = StateMachine.<ScheduleStatus>builder(name)
         .logTransitions()
-        .initialState(State.create(initialState))
+        .initialState(initialState)
         .addState(
             Rule.from(INIT)
                 .to(PENDING, THROTTLED, UNKNOWN))
@@ -286,16 +224,15 @@ class TaskStateMachine {
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
                     KILLING, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -303,25 +240,24 @@ class TaskStateMachine {
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
-                            // fall through
-                          case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(RESCHEDULE);
+                            addFollowup(KILL);
                             break;
 
-                          case UNKNOWN:
+                          case KILLING:
+                            addFollowup(KILL);
                             break;
 
-                           default:
-                             // No-op.
+                          default:
+                            // No-op.
                         }
                       }
                     }
@@ -330,20 +266,19 @@ class TaskStateMachine {
             Rule.from(STARTING)
                 .to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -351,25 +286,25 @@ class TaskStateMachine {
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case UNKNOWN:
                             // The slave previously acknowledged that it had the task, and now
                             // stopped reporting it.
-                            updateState(ScheduleStatus.LOST);
+                            addFollowupTransition(LOST);
                             break;
 
-                           default:
-                             // No-op.
+                          default:
+                            // No-op.
                         }
                       }
                     }
@@ -378,20 +313,19 @@ class TaskStateMachine {
             Rule.from(RUNNING)
                 .to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -399,19 +333,19 @@ class TaskStateMachine {
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case UNKNOWN:
-                            updateState(ScheduleStatus.LOST);
+                            addFollowupTransition(LOST);
                             break;
 
                            default:
@@ -455,23 +389,26 @@ class TaskStateMachine {
         // Since we want this action to be performed last in the transition sequence, the callback
         // must be the last chained transition callback.
         .onAnyTransition(
-            new Closure<Transition<State>>() {
-              @Override public void execute(final Transition<State> transition) {
-                ScheduleStatus from = transition.getFrom().getState();
-                ScheduleStatus to = transition.getTo().getState();
-
-                if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
-                    // Prevent an update when killing a pending task, since the task is deleted
-                    // prior to the update.
-                    && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
-                  addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
-                } else if (!transition.isAllowed()) {
-                  LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
-                  ILLEGAL_TRANSITIONS.incrementAndGet();
-                }
-
+            new Closure<Transition<ScheduleStatus>>() {
+              @Override public void execute(final Transition<ScheduleStatus> transition) {
                 if (transition.isValidStateChange()) {
+                  ScheduleStatus from = transition.getFrom();
+                  ScheduleStatus to = transition.getTo();
+
+                  // TODO(wfarner): Clean up this hack.  This is here to suppress unnecessary work
+                  // (save followed by delete), but it shows a wart with this catch-all behavior.
+                  // Strongly consider pushing the SAVE_STATE behavior to each transition handler.
+                  boolean pendingDeleteHack =
+                      !(((from == PENDING) || (from == THROTTLED)) && (to == KILLING));
+
+                  // Don't bother saving state of a task that is being removed.
+                  if ((to != UNKNOWN) && pendingDeleteHack) {
+                    addFollowup(SAVE_STATE);
+                  }
                   previousState = from;
+                } else {
+                  LOG.severe("Illegal state transition attempted: " + transition);
+                  ILLEGAL_TRANSITIONS.incrementAndGet();
                 }
               }
             }
@@ -485,56 +422,25 @@ class TaskStateMachine {
         .build();
   }
 
-  private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
-    return new Closure<Transition<State>>() {
-      @Override public void execute(Transition<State> item) {
-        addWork(work);
-      }
-    };
-  }
-
-  private void addWork(WorkCommand work) {
-    addWork(work, Functions.<IScheduledTask>identity());
+  private void addFollowup(Action action) {
+    addFollowup(new SideEffect(action, Optional.<ScheduleStatus>absent()));
   }
 
-  private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
-    LOG.info("Adding work command " + work + " for " + this);
-    workSink.addWork(work, TaskStateMachine.this, mutation);
+  private void addFollowupTransition(ScheduleStatus status) {
+    addFollowup(new SideEffect(STATE_CHANGE, Optional.of(status)));
   }
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status) {
-    return updateState(status, Functions.<IScheduledTask>identity());
+  private void addFollowup(SideEffect sideEffect) {
+    LOG.info("Adding work command " + sideEffect + " for " + this);
+    sideEffects.add(sideEffect);
   }
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @param auditMessage The (optional) audit message to associate with the transition.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
-    return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
-  }
-
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
-   *
-   * @param status Status to apply to the task.
-   * @param mutation Mutate operation to perform while updating the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(
-      ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation) {
-
-    return updateState(status, mutation, Optional.<String>absent());
+  private Closure<Transition<ScheduleStatus>> addFollowupClosure(final Action action) {
+    return new Closure<Transition<ScheduleStatus>>() {
+      @Override public void execute(Transition<ScheduleStatus> item) {
+        addFollowup(action);
+      }
+    };
   }
 
   /**
@@ -543,59 +449,35 @@ class TaskStateMachine {
    * will be appended to the work queue.
    *
    * @param status Status to apply to the task.
-   * @param auditMessage The audit message to associate with the transition.
-   * @param mutation Mutate operation to perform while updating the task.
    * @return {@code true} if the state change was allowed, {@code false} otherwise.
    */
-  private synchronized boolean updateState(
-      final ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation,
-      final Optional<String> auditMessage) {
-
+  public synchronized TransitionResult updateState(final ScheduleStatus status) {
     checkNotNull(status);
-    checkNotNull(mutation);
-    checkNotNull(auditMessage);
+    Preconditions.checkState(sideEffects.isEmpty());
 
     /**
      * Don't bother applying noop state changes.  If we end up modifying task state without a
      * state transition (e.g. storing resource consumption of a running task), we need to find
      * a different way to suppress noop transitions.
      */
-    if (stateMachine.getState().getState() != status) {
-      Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
-          new Function<IScheduledTask, IScheduledTask>() {
-            @Override public IScheduledTask apply(IScheduledTask task) {
-              ScheduledTask builder = task.newBuilder();
-              builder.addToTaskEvents(new TaskEvent()
-                  .setTimestamp(clock.nowMillis())
-                  .setStatus(status)
-                  .setMessage(auditMessage.orNull())
-                  .setScheduler(LOCAL_HOST_SUPPLIER.get()));
-              return IScheduledTask.build(builder);
-            }
-          });
-      return stateMachine.transition(State.create(status, operation));
+    if (stateMachine.getState() == status) {
+      return new TransitionResult(false, ImmutableSet.<SideEffect>of());
     }
 
-    return false;
+    boolean success = stateMachine.transition(status);
+    ImmutableSet<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects);
+    sideEffects.clear();
+    return new TransitionResult(success, transitionEffects);
   }
 
   /**
    * Fetch the current state from the state machine.
+   * TODO(wfarner): Consider removing, the caller should know this.
    *
    * @return The current state.
    */
   public synchronized ScheduleStatus getState() {
-    return stateMachine.getState().getState();
-  }
-
-  /**
-   * Gets the ID for the task that this state machine manages.
-   *
-   * @return The state machine's task ID.
-   */
-  public String getTaskId() {
-    return taskId;
+    return stateMachine.getState();
   }
 
   /**
@@ -611,6 +493,6 @@ class TaskStateMachine {
 
   @Override
   public String toString() {
-    return getTaskId();
+    return stateMachine.getName();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/TransitionResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TransitionResult.java b/src/main/java/org/apache/aurora/scheduler/state/TransitionResult.java
new file mode 100644
index 0000000..15174bd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/TransitionResult.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * The actions that should be performed in response to a state transition attempt.
+ *
+ * {@see TaskStateMachine}
+ */
+public class TransitionResult {
+  private final boolean success;
+  private final ImmutableSet<SideEffect> sideEffects;
+
+  /**
+   * Creates a transition result with the given side effects.
+   *
+   * @param success Whether the transition attempt relevant to this result was successful.
+   * @param sideEffects Actions that must be performed in response to the state transition.
+   */
+  public TransitionResult(boolean success, ImmutableSet<SideEffect> sideEffects) {
+    this.success = success;
+    this.sideEffects = Preconditions.checkNotNull(sideEffects);
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public ImmutableSet<SideEffect> getSideEffects() {
+    return sideEffects;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof TransitionResult)) {
+      return false;
+    }
+
+    TransitionResult other = (TransitionResult) o;
+    return (success == other.success)
+        && Objects.equal(sideEffects, other.sideEffects);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(success, sideEffects);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("success", success)
+        .add("sideEffects", sideEffects)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java b/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
deleted file mode 100644
index aff74d5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-/**
- * Descriptions of the different types of external work commands that task state machines may
- * trigger.
- */
-enum WorkCommand {
-  // Send an instruction for the runner of this task to kill the task.
-  KILL,
-  // Create a new state machine with a copy of this task.
-  RESCHEDULE,
-  // Update the task's state (schedule status) in the persistent store to match the state machine.
-  UPDATE_STATE,
-  // Delete this task from the persistent store.
-  DELETE,
-  // Increment the failure count for this task.
-  INCREMENT_FAILURES
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 53d0c85..984f506 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -198,6 +198,10 @@ public interface Storage {
 
   /**
    * Executes the unit of mutating {@code work}.
+   * TODO(wfarner): Add a mechanism by which mutating work can add side-effect operations to be
+   * performed after completion of the outer-most transaction.  As it stands, it's somewhat
+   * futile to try to achieve this within a transaction, since the local code does not know
+   * if the current transaction is nested.
    *
    * @param work The unit of work to execute.
    * @param <T> The type of result this unit of work produces.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 7fe297a..3d0ff2d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -71,6 +71,8 @@ public interface TaskStore {
     /**
      * Offers temporary mutable access to tasks.  If a task ID is not found, it will be silently
      * skipped, and no corresponding task will be returned.
+     * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case,
+     * and it prevents the caller from worrying about a bad query having broad impact.
      *
      * @param query Query to match tasks against.
      * @param mutator The mutate operation.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 5379300..a0525e5 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -51,7 +52,6 @@ import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IArgumentMatcher;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -59,6 +59,7 @@ import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
@@ -67,6 +68,7 @@ import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class StateManagerImplTest extends EasyMockTest {
@@ -100,11 +102,6 @@ public class StateManagerImplTest extends EasyMockTest {
         rescheduleCalculator);
   }
 
-  @After
-  public void validateCompletion() {
-    assertTrue(stateManager.getStorage().getEvents().isEmpty());
-  }
-
   private static class StateChangeMatcher implements IArgumentMatcher {
     private final String taskId;
     private final ScheduleStatus from;
@@ -181,7 +178,7 @@ public class StateManagerImplTest extends EasyMockTest {
         .setStatus(PENDING)
         .setTaskEvents(ImmutableList.of(new TaskEvent()
             .setTimestamp(clock.nowMillis())
-            .setScheduler(TaskStateMachine.LOCAL_HOST_SUPPLIER.get())
+            .setScheduler(StateManagerImpl.LOCAL_HOST_SUPPLIER.get())
             .setStatus(PENDING)))
         .setAssignedTask(new AssignedTask()
             .setInstanceId(3)
@@ -294,6 +291,92 @@ public class StateManagerImplTest extends EasyMockTest {
     changeState(unknownTask, RUNNING);
   }
 
+  private void noFlappingPenalty() {
+    expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+        .andReturn(0L);
+  }
+
+  @Test
+  public void testIncrementFailureCount() {
+    ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+    noFlappingPenalty();
+    expectStateTransitions(taskId2, INIT, PENDING);
+
+    control.replay();
+
+    insertTask(task, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, FAILED);
+    IScheduledTask rescheduledTask = Iterables.getOnlyElement(
+        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId2)));
+    assertEquals(1, rescheduledTask.getFailureCount());
+  }
+
+  @Test
+  public void testDoubleTransition() {
+    // Tests that a transition inducing another transition (STATE_CHANGE action) is performed.
+
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, LOST);
+
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+    noFlappingPenalty();
+    expectStateTransitions(taskId2, INIT, PENDING);
+
+    control.replay();
+
+    insertTask(task, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, UNKNOWN);
+  }
+
+  @Test
+  public void testCasTaskPresent() {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, FAILED);
+
+    control.replay();
+
+    insertTask(task, 0);
+    assignTask(taskId, HOST_A);
+    assertFalse(stateManager.changeState(
+        taskId,
+        Optional.of(PENDING),
+        RUNNING,
+        Optional.<String>absent()));
+    assertTrue(stateManager.changeState(
+        taskId,
+        Optional.of(ASSIGNED),
+        FAILED,
+        Optional.<String>absent()));
+  }
+
+  @Test
+  public void testCasTaskNotFound() {
+    control.replay();
+
+    assertFalse(stateManager.changeState(
+        "a",
+        Optional.of(PENDING),
+        ASSIGNED,
+        Optional.<String>absent()));
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,