You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/29 01:15:32 UTC

[1/2] Refactor StateManagerImpl and TaskStateMachine for less code and better readability.

Updated Branches:
  refs/heads/master e15988090 -> 3870df32b


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
index f44ee58..153f82c 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
@@ -15,23 +15,26 @@
  */
 package org.apache.aurora.scheduler.state;
 
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.base.Function;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.state.TaskStateMachine.WorkSink;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,76 +46,43 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
 import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.WorkCommand.DELETE;
-import static org.apache.aurora.scheduler.state.WorkCommand.INCREMENT_FAILURES;
-import static org.apache.aurora.scheduler.state.WorkCommand.KILL;
-import static org.apache.aurora.scheduler.state.WorkCommand.RESCHEDULE;
-import static org.apache.aurora.scheduler.state.WorkCommand.UPDATE_STATE;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class TaskStateMachineTest extends EasyMockTest {
+// TODO(wfarner): At this rate, it's probably best to exhaustively cover this class with a matrix
+// from every state to every state.
+public class TaskStateMachineTest {
 
-  private WorkSink workSink;
-  private FakeClock clock;
   private TaskStateMachine stateMachine;
 
   @Before
   public void setUp() {
-    workSink = createMock(WorkSink.class);
-    clock = new FakeClock();
-    stateMachine = makeStateMachine("test", makeTask(false));
+    stateMachine = makeStateMachine(makeTask(false));
   }
 
-  private TaskStateMachine makeStateMachine(String taskId, ScheduledTask builder) {
-    return new TaskStateMachine(
-        taskId,
-        IScheduledTask.build(builder),
-        workSink,
-        clock,
-        INIT);
+  private TaskStateMachine makeStateMachine(ScheduledTask builder) {
+    return new TaskStateMachine(IScheduledTask.build(builder));
   }
 
   @Test
   public void testSimpleTransition() {
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(DELETE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING);
-    assertEquals(INIT, stateMachine.getPreviousState());
-    transition(stateMachine, ASSIGNED);
-    assertEquals(PENDING, stateMachine.getPreviousState());
-    transition(stateMachine, STARTING);
-    assertEquals(ASSIGNED, stateMachine.getPreviousState());
-    transition(stateMachine, RUNNING);
-    assertEquals(STARTING, stateMachine.getPreviousState());
-    transition(stateMachine, FINISHED);
-    assertEquals(RUNNING, stateMachine.getPreviousState());
-    transition(stateMachine, UNKNOWN);
-    assertEquals(FINISHED, stateMachine.getPreviousState());
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING, FINISHED);
+    legalTransition(UNKNOWN, Action.DELETE);
   }
 
   @Test
   public void testServiceRescheduled() {
-    stateMachine = makeStateMachine("test", makeTask(true));
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FINISHED);
+    stateMachine = makeStateMachine(makeTask(true));
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(FINISHED, Action.SAVE_STATE, Action.RESCHEDULE);
   }
 
   @Test
@@ -120,12 +90,12 @@ public class TaskStateMachineTest extends EasyMockTest {
     Set<ScheduleStatus> terminalStates = Tasks.TERMINAL_STATES;
 
     for (ScheduleStatus endState : terminalStates) {
-      stateMachine = makeStateMachine("test", makeTask(false));
-      expectWork(UPDATE_STATE).times(5);
+      stateMachine = makeStateMachine(makeTask(false));
+      Set<SideEffect.Action> finalActions = Sets.newHashSet(Action.SAVE_STATE);
 
       switch (endState) {
         case FAILED:
-          expectWork(INCREMENT_FAILURES);
+          finalActions.add(Action.INCREMENT_FAILURES);
           break;
 
         case FINISHED:
@@ -133,138 +103,88 @@ public class TaskStateMachineTest extends EasyMockTest {
 
         case KILLED:
         case LOST:
-          expectWork(RESCHEDULE);
+          finalActions.add(Action.RESCHEDULE);
           break;
 
         case KILLING:
-          expectWork(KILL);
+          finalActions.add(Action.KILL);
           break;
 
         default:
           fail("Unknown state " + endState);
       }
 
-      control.replay();
-
-      transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, endState);
+      expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+      legalTransition(endState, finalActions);
 
       for (ScheduleStatus badTransition : terminalStates) {
-        transition(stateMachine, badTransition);
+        illegalTransition(badTransition);
       }
-
-      control.verify();
-      control.reset();
     }
-
-    control.replay();  // Needed so the teardown verify doesn't break.
   }
 
   @Test
   public void testUnknownTask() {
-    expectWork(KILL);
-
-    control.replay();
+    stateMachine = new TaskStateMachine("id");
 
-    transition(stateMachine, UNKNOWN, RUNNING);
+    illegalTransition(RUNNING, Action.KILL);
   }
 
   @Test
   public void testLostTask() {
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, LOST);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(LOST, Action.SAVE_STATE, Action.RESCHEDULE);
   }
 
   @Test
   public void testKilledPending() {
-    expectWork(UPDATE_STATE);
-    expectWork(DELETE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, KILLING);
+    expectUpdateStateOnTransitionTo(PENDING);
+    legalTransition(KILLING, Action.DELETE);
   }
 
   @Test
   public void testMissingStartingRescheduledImmediately() {
-    ScheduledTask task = makeTask(false);
-    task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.PENDING));
-    stateMachine = makeStateMachine("test", task);
-
-    expectWork(UPDATE_STATE).times(4);
-    expectWork(RESCHEDULE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, UNKNOWN);
-    assertThat(stateMachine.getState(), is(ScheduleStatus.LOST));
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+    illegalTransition(UNKNOWN,
+        ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
   }
 
   @Test
   public void testMissingRunningRescheduledImmediately() {
-    ScheduledTask task = makeTask(false);
-    task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.PENDING));
-    stateMachine = makeStateMachine("test", task);
-
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, UNKNOWN);
-    assertThat(stateMachine.getState(), is(ScheduleStatus.LOST));
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    illegalTransition(UNKNOWN,
+        ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
   }
 
   @Test
   public void testRestartedTask() {
-    expectWork(UPDATE_STATE).times(6);
-    expectWork(KILL);
-    expectWork(RESCHEDULE);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, RESTARTING, FINISHED);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    legalTransition(FINISHED, Action.SAVE_STATE, Action.RESCHEDULE);
   }
 
   @Test
   public void testRogueRestartedTask() {
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(KILL).times(2);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, RESTARTING, RUNNING);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+    illegalTransition(RUNNING, Action.KILL);
   }
 
   @Test
   public void testPendingRestartedTask() {
-    expectWork(UPDATE_STATE).times(1);
-
-    control.replay();
-
+    expectUpdateStateOnTransitionTo(PENDING);
     // PENDING -> RESTARTING should not be allowed.
-    transition(stateMachine, PENDING, RESTARTING);
+    illegalTransition(RESTARTING);
   }
 
   @Test
   public void testAllowsSkipStartingAndRunning() {
-    expectWork(UPDATE_STATE).times(3);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, FINISHED);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, FINISHED);
   }
 
   @Test
   public void testAllowsSkipRunning() {
-    expectWork(UPDATE_STATE).times(4);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, FINISHED);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, FINISHED);
   }
 
   @Test
@@ -272,23 +192,15 @@ public class TaskStateMachineTest extends EasyMockTest {
     ScheduledTask task = makeTask(false);
     task.getAssignedTask().getTask().setMaxTaskFailures(10);
     task.setFailureCount(8);
-    stateMachine = makeStateMachine("test", task);
-
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-    expectWork(INCREMENT_FAILURES);
+    stateMachine = makeStateMachine(task);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(FAILED, Action.SAVE_STATE, Action.RESCHEDULE, Action.INCREMENT_FAILURES);
 
     ScheduledTask rescheduled = task.deepCopy();
     rescheduled.setFailureCount(9);
-    TaskStateMachine rescheduledMachine = makeStateMachine("test2", rescheduled);
-    expectWork(UPDATE_STATE, rescheduledMachine).times(5);
-    expectWork(INCREMENT_FAILURES, rescheduledMachine);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
-
-    transition(rescheduledMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+    stateMachine = makeStateMachine(rescheduled);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(FAILED, Action.SAVE_STATE, Action.INCREMENT_FAILURES);
   }
 
   @Test
@@ -296,62 +208,314 @@ public class TaskStateMachineTest extends EasyMockTest {
     ScheduledTask task = makeTask(false);
     task.getAssignedTask().getTask().setMaxTaskFailures(-1);
     task.setFailureCount(1000);
-    stateMachine = makeStateMachine("test", task);
+    stateMachine = makeStateMachine(task);
 
-    expectWork(UPDATE_STATE).times(5);
-    expectWork(RESCHEDULE);
-    expectWork(INCREMENT_FAILURES);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(FAILED, Action.SAVE_STATE, Action.RESCHEDULE, Action.INCREMENT_FAILURES);
   }
 
   @Test
   public void testKillingRequest() {
-    expectWork(UPDATE_STATE).times(6);
-    expectWork(KILL);
-
-    control.replay();
-
-    transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, KILLING, KILLED);
+    expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+    legalTransition(KILLING, Action.KILL, Action.SAVE_STATE);
+    expectUpdateStateOnTransitionTo(KILLED);
   }
 
   @Test
   public void testThrottledTask() {
-    expectWork(UPDATE_STATE).times(2);
+    expectUpdateStateOnTransitionTo(THROTTLED, PENDING);
+  }
 
-    control.replay();
+  private static final Function<Action, SideEffect> TO_SIDE_EFFECT =
+      new Function<Action, SideEffect>() {
+        @Override public SideEffect apply(Action action) {
+          return new SideEffect(action, Optional.<ScheduleStatus>absent());
+        }
+      };
+
+  private void legalTransition(ScheduleStatus state, SideEffect.Action... expectedActions) {
+    legalTransition(state, ImmutableSet.copyOf(expectedActions));
+  }
 
-    transition(stateMachine, THROTTLED, PENDING);
+  private void legalTransition(ScheduleStatus state, Set<SideEffect.Action> expectedActions) {
+    ScheduleStatus initialStatus = stateMachine.getState();
+    TransitionResult result = stateMachine.updateState(state);
+    assertTrue("Transition to " + state + " was not successful", result.isSuccess());
+    assertEquals(initialStatus, stateMachine.getPreviousState());
+    assertEquals(state, stateMachine.getState());
+    assertEquals(
+        FluentIterable.from(expectedActions).transform(TO_SIDE_EFFECT).toSet(),
+        result.getSideEffects());
   }
 
-  private static void transition(TaskStateMachine stateMachine, ScheduleStatus... states) {
+  private void expectUpdateStateOnTransitionTo(ScheduleStatus... states) {
     for (ScheduleStatus status : states) {
-      stateMachine.updateState(status);
+      legalTransition(status, Action.SAVE_STATE);
     }
   }
 
-  private IExpectationSetters<Void> expectWork(WorkCommand work) {
-    return expectWork(work, stateMachine);
+  private void illegalTransition(ScheduleStatus state, SideEffect.Action... expectedActions) {
+    illegalTransition(
+        state,
+        FluentIterable.from(
+            ImmutableSet.copyOf(expectedActions)).transform(TO_SIDE_EFFECT).toSet());
   }
 
-  private IExpectationSetters<Void> expectWork(WorkCommand work, TaskStateMachine machine) {
-    workSink.addWork(
-        eq(work),
-        eq(machine),
-        EasyMock.<Function<IScheduledTask, IScheduledTask>>anyObject());
-    return expectLastCall();
+  private void illegalTransition(ScheduleStatus state, Set<SideEffect> sideEffects) {
+    ScheduleStatus initialStatus = stateMachine.getState();
+    TransitionResult result = stateMachine.updateState(state);
+    assertEquals(initialStatus, stateMachine.getState());
+    assertFalse(result.isSuccess());
+    assertEquals(sideEffects, result.getSideEffects());
   }
 
   private static ScheduledTask makeTask(boolean service) {
     return new ScheduledTask()
+        .setStatus(INIT)
         .setAssignedTask(
             new AssignedTask()
+                .setTaskId("test")
                 .setTask(
                     new TaskConfig()
                         .setOwner(new Identity().setRole("roleA"))
                         .setJobName("jobA")
                         .setIsService(service)));
   }
+
+  private static final TransitionResult LEGAL_NO_ACTION =
+      new TransitionResult(true, ImmutableSet.<SideEffect>of());
+  private static final TransitionResult SAVE = new TransitionResult(
+      true,
+      ImmutableSet.of(new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult SAVE_AND_KILL = new TransitionResult(
+      true,
+      ImmutableSet.of(
+          new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult SAVE_AND_RESCHEDULE = new TransitionResult(
+      true,
+      ImmutableSet.of(
+          new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.RESCHEDULE, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult SAVE_KILL_AND_RESCHEDULE = new TransitionResult(
+      true,
+      ImmutableSet.of(
+          new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.RESCHEDULE, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult ILLEGAL_KILL = new TransitionResult(
+      false,
+      ImmutableSet.of(new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult RECORD_FAILURE = new TransitionResult(
+      true,
+      ImmutableSet.of(
+          new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+          new SideEffect(Action.INCREMENT_FAILURES, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult DELETE_TASK = new TransitionResult(
+      true,
+      ImmutableSet.of(new SideEffect(Action.DELETE, Optional.<ScheduleStatus>absent())));
+  private static final TransitionResult MARK_LOST = new TransitionResult(
+      false,
+      ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
+
+  private static final class TestCase {
+    private final boolean taskPresent;
+    private final ScheduleStatus from;
+    private final ScheduleStatus to;
+
+    private TestCase(boolean taskPresent, ScheduleStatus from, ScheduleStatus to) {
+      this.taskPresent = taskPresent;
+      this.from = from;
+      this.to = to;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(taskPresent, from, to);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof TestCase)) {
+        return false;
+      }
+
+      TestCase other = (TestCase) o;
+      return (taskPresent == other.taskPresent)
+          && (from == other.from)
+          && (to == other.to);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+          .add("taskPresent", taskPresent)
+          .add("from", from)
+          .add("to", to)
+          .toString();
+    }
+  }
+
+  private static final Map<TestCase, TransitionResult> EXPECTATIONS =
+      ImmutableMap.<TestCase, TransitionResult>builder()
+          .put(new TestCase(true, INIT, THROTTLED), SAVE)
+          .put(new TestCase(true, INIT, PENDING), SAVE)
+          .put(new TestCase(false, INIT, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, INIT, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, INIT, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, INIT, UNKNOWN), LEGAL_NO_ACTION)
+          .put(new TestCase(true, THROTTLED, PENDING), SAVE)
+          .put(new TestCase(true, THROTTLED, KILLING), DELETE_TASK)
+          .put(new TestCase(false, THROTTLED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, THROTTLED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, THROTTLED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, PENDING, ASSIGNED), SAVE)
+          .put(new TestCase(false, PENDING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, PENDING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, PENDING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, PENDING, KILLING), DELETE_TASK)
+          .put(new TestCase(false, ASSIGNED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, ASSIGNED, STARTING), SAVE)
+          .put(new TestCase(false, ASSIGNED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, ASSIGNED, RUNNING), SAVE)
+          .put(new TestCase(false, ASSIGNED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, ASSIGNED, FINISHED), SAVE)
+          .put(new TestCase(true, ASSIGNED, PREEMPTING), SAVE_AND_KILL)
+          .put(new TestCase(true, ASSIGNED, RESTARTING), SAVE_AND_KILL)
+          .put(new TestCase(true, ASSIGNED, FAILED), RECORD_FAILURE)
+          .put(new TestCase(true, ASSIGNED, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, ASSIGNED, KILLING), SAVE_AND_KILL)
+          .put(new TestCase(true, ASSIGNED, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(false, STARTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, STARTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, STARTING, RUNNING), SAVE)
+          .put(new TestCase(false, STARTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, STARTING, FINISHED), SAVE)
+          .put(new TestCase(true, STARTING, PREEMPTING), SAVE_AND_KILL)
+          .put(new TestCase(true, STARTING, RESTARTING), SAVE_AND_KILL)
+          .put(new TestCase(true, STARTING, FAILED), RECORD_FAILURE)
+          .put(new TestCase(true, STARTING, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, STARTING, KILLING), SAVE_AND_KILL)
+          .put(new TestCase(true, STARTING, LOST), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, STARTING, UNKNOWN), MARK_LOST)
+          .put(new TestCase(false, RUNNING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, RUNNING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, RUNNING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, RUNNING, FINISHED), SAVE)
+          .put(new TestCase(true, RUNNING, PREEMPTING), SAVE_AND_KILL)
+          .put(new TestCase(true, RUNNING, RESTARTING), SAVE_AND_KILL)
+          .put(new TestCase(true, RUNNING, FAILED), RECORD_FAILURE)
+          .put(new TestCase(true, RUNNING, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RUNNING, KILLING), SAVE_AND_KILL)
+          .put(new TestCase(true, RUNNING, LOST), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RUNNING, UNKNOWN), MARK_LOST)
+          .put(new TestCase(true, FINISHED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, FINISHED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, FINISHED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, FINISHED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, FINISHED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, FINISHED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, FINISHED, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(true, PREEMPTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, PREEMPTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, PREEMPTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, PREEMPTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, PREEMPTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, PREEMPTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, PREEMPTING, FINISHED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, PREEMPTING, FAILED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, PREEMPTING, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, PREEMPTING, KILLING), SAVE)
+          .put(new TestCase(true, PREEMPTING, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(true, PREEMPTING, UNKNOWN), MARK_LOST)
+          .put(new TestCase(true, RESTARTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, RESTARTING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, RESTARTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, RESTARTING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, RESTARTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, RESTARTING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, RESTARTING, FINISHED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RESTARTING, FAILED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RESTARTING, KILLED), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(true, RESTARTING, KILLING), SAVE)
+          .put(new TestCase(true, RESTARTING, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(true, RESTARTING, UNKNOWN), MARK_LOST)
+          .put(new TestCase(true, FAILED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, FAILED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, FAILED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, FAILED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, FAILED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, FAILED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, FAILED, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(true, KILLED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLED, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(true, KILLING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLING, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLING, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, KILLING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLING, FINISHED), SAVE)
+          .put(new TestCase(true, KILLING, FAILED), SAVE)
+          .put(new TestCase(true, KILLING, KILLED), SAVE)
+          .put(new TestCase(true, KILLING, LOST), SAVE)
+          .put(new TestCase(true, KILLING, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(true, LOST, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, LOST, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, LOST, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, LOST, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, LOST, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, LOST, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, LOST, UNKNOWN), DELETE_TASK)
+          .put(new TestCase(false, UNKNOWN, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(false, UNKNOWN, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(false, UNKNOWN, RUNNING), ILLEGAL_KILL)
+          .build();
+
+  @Test
+  public void testAllTransitions() {
+    for (ScheduleStatus from : ScheduleStatus.values()) {
+      for (ScheduleStatus to : ScheduleStatus.values()) {
+        for (Boolean taskPresent : ImmutableList.of(Boolean.TRUE, Boolean.FALSE)) {
+          TestCase testCase = new TestCase(taskPresent, from, to);
+
+          TransitionResult expectation = EXPECTATIONS.get(testCase);
+          if (expectation == null) {
+            expectation = new TransitionResult(false, ImmutableSet.<SideEffect>of());
+          }
+
+          TaskStateMachine machine;
+          if (taskPresent) {
+            // Cannot create a state machine for an UNKNOWN task that is in the store.
+            boolean expectException = from == UNKNOWN;
+            try {
+              machine =
+                  new TaskStateMachine(IScheduledTask.build(makeTask(false).setStatus(from)));
+              if (expectException) {
+                fail();
+              }
+            } catch (IllegalStateException e) {
+              if (!expectException) {
+                throw e;
+              } else {
+                continue;
+              }
+            }
+          } else {
+            machine = new TaskStateMachine("name");
+          }
+
+          assertEquals(
+              "Unexpected behavor for " + testCase,
+              expectation,
+              machine.updateState(to));
+        }
+      }
+    }
+  }
 }


[2/2] git commit: Refactor StateManagerImpl and TaskStateMachine for less code and better readability.

Posted by wf...@apache.org.
Refactor StateManagerImpl and TaskStateMachine for less code and better
readability.

The "big picture" for this change is that the closures inside
TaskStateMachine no longer drop items onto a work queue that feeds back into
StateManagerImpl.  Instead, it returns these actions in a TransitionResult.
I intend to improve this further in the future by exposing only a helper
function in TaskStateMachine, to guarantee the one-time-use semantic.

Reviewed at https://reviews.apache.org/r/16873/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/3870df32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/3870df32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/3870df32

Branch: refs/heads/master
Commit: 3870df32bec25a2d0368b44c4760607942315ab3
Parents: e159880
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 28 16:09:49 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 28 16:09:49 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/state/SideEffect.java      | 106 ++++
 .../scheduler/state/SideEffectStorage.java      | 169 ------
 .../scheduler/state/StateManagerImpl.java       | 537 ++++++++++---------
 .../scheduler/state/TaskStateMachine.java       | 520 +++++++-----------
 .../scheduler/state/TransitionResult.java       |  73 +++
 .../aurora/scheduler/state/WorkCommand.java     |  33 --
 .../aurora/scheduler/storage/Storage.java       |   4 +
 .../aurora/scheduler/storage/TaskStore.java     |   2 +
 .../scheduler/state/StateManagerImplTest.java   |  97 +++-
 .../scheduler/state/TaskStateMachineTest.java   | 512 ++++++++++++------
 10 files changed, 1084 insertions(+), 969 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
new file mode 100644
index 0000000..04e6c7e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+/**
+ * Descriptions of the different types of external work commands that task state machines may
+ * trigger.
+ */
+class SideEffect {
+  private final Action action;
+  private final Optional<ScheduleStatus> nextState;
+
+  SideEffect(Action action, Optional<ScheduleStatus> nextState) {
+    this.action = action;
+    if (action == Action.STATE_CHANGE) {
+      Preconditions.checkArgument(
+          nextState.isPresent(),
+          "A next state must be provided for a state change action.");
+    }
+    this.nextState = nextState;
+  }
+
+  public Action getAction() {
+    return action;
+  }
+
+  public Optional<ScheduleStatus> getNextState() {
+    return nextState;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof SideEffect)) {
+      return false;
+    }
+
+    SideEffect other = (SideEffect) o;
+    return Objects.equal(action, other.action)
+        && Objects.equal(nextState, other.nextState);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(action, nextState);
+  }
+
+  @Override
+  public String toString() {
+    if (nextState.isPresent()) {
+      return action.toString() + " " + nextState.get();
+    } else {
+      return action.toString();
+    }
+  }
+
+  enum Action {
+    /**
+     * Send an instruction for the runner of this task to kill the task.
+     */
+    KILL,
+
+    /**
+     * Create a new state machine with a copy of this task.
+     */
+    RESCHEDULE,
+
+    /**
+     * Update the task's state (schedule status) in the persistent store to match the state machine.
+     */
+    SAVE_STATE,
+
+    /**
+     * Delete this task from the persistent store.
+     */
+    DELETE,
+
+    /**
+     * Increment the failure count for this task.
+     */
+    INCREMENT_FAILURES,
+
+    /**
+     * Perform an additional state change on the task.
+     */
+    STATE_CHANGE
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
deleted file mode 100644
index 2bdd459..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around the persistent storage and mutable state.
- */
-class SideEffectStorage {
-
-  private final Queue<PubsubEvent> events = Lists.newLinkedList();
-  @VisibleForTesting
-  Queue<PubsubEvent> getEvents() {
-    return events;
-  }
-
-  private AtomicBoolean inOperation = new AtomicBoolean(false);
-
-  private final Storage storage;
-  private final OperationFinalizer operationFinalizer;
-  private final EventSink eventSink;
-
-  interface OperationFinalizer {
-    /**
-     * Performs any work necessary to complete the operation.
-     * This is executed in the context of a write operation, immediately after the work
-     * executes normally.
-     * NOTE: At present, this is executed for every nesting level of operations, rather than
-     * at the completion of the top-level operation.
-     * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
-     * for more detail.
-     *
-     * @param work Work to finalize.
-     * @param storeProvider Mutable store reference.
-     */
-    void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
-  }
-
-  SideEffectStorage(
-      Storage storage,
-      OperationFinalizer operationFinalizer,
-      EventSink eventSink) {
-
-    this.storage = checkNotNull(storage);
-    this.operationFinalizer = checkNotNull(operationFinalizer);
-    this.eventSink = checkNotNull(eventSink);
-  }
-
-  /**
-   * Perform a unit of work in a mutating operation.  This supports nesting/reentrancy.
-   *
-   * @param work Work to perform.
-   * @param <T> Work return type
-   * @param <E> Work exception type.
-   * @return The work return value.
-   * @throws E The work exception.
-   */
-  <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
-    return storage.write(executeSideEffectsAfter(work));
-  }
-
-  <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    return storage.consistentRead(work);
-  }
-
-  /**
-   * Work that has side effects external to the storage system.
-   * Work may add side effect and pubsub events, which will be executed/sent upon normal
-   * completion of the operation.
-   *
-   * @param <T> Work return type.
-   * @param <E> Work exception type.
-   */
-  abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
-    protected final void addTaskEvent(PubsubEvent notice) {
-      Preconditions.checkState(inOperation.get());
-      events.add(Preconditions.checkNotNull(notice));
-    }
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions.
-   *
-   * @param <T>   Work return type.
-   */
-  abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
-  }
-
-  /**
-   * Work with side effects that does not have a return value.
-   *
-   * @param <E> Work exception type.
-   */
-  abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
-
-    @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
-      execute(storeProvider);
-      return null;
-    }
-
-    abstract void execute(MutableStoreProvider storeProvider) throws E;
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions or have a return
-   * value.
-   */
-  abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
-  }
-
-  private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
-      final SideEffectWork<T, E> work) {
-
-    return new MutateWork<T, E>() {
-      @Override public T apply(MutableStoreProvider storeProvider) throws E {
-        boolean topLevelOperation = inOperation.compareAndSet(false, true);
-
-        try {
-          T result = work.apply(storeProvider);
-
-          // TODO(William Farner): Maintaining this since it matches prior behavior, but this
-          // seems wrong.  Double-check whether this is necessary, or if only the top-level
-          // operation should be executing the finalizer.  Update doc on OperationFinalizer
-          // once this is assessed.
-          operationFinalizer.finalize(work, storeProvider);
-          if (topLevelOperation) {
-            while (!events.isEmpty()) {
-              eventSink.post(events.remove());
-            }
-          }
-          return result;
-        } finally {
-          if (topLevelOperation) {
-            inOperation.set(false);
-          }
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 2b8ca09..6fee43c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -15,33 +15,39 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Comparator;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
@@ -49,9 +55,10 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -60,96 +67,27 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Iterables.transform;
 import static com.twitter.common.base.MorePreconditions.checkNotBlank;
 
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
-
 
 /**
  * Manager of all persistence-related operations for the scheduler.  Acts as a controller for
  * persisted state machine transitions, and their side-effects.
- *
- * TODO(wfarner): This class is due for an overhaul.  There are several aspects of it that could
- * probably be made much simpler.  Specifically, the workQueue is particularly difficult to reason
- * about.
  */
 public class StateManagerImpl implements StateManager {
   private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
 
-  @VisibleForTesting
-  SideEffectStorage getStorage() {
-    return storage;
-  }
-
-  // Work queue to receive state machine side effect work.
-  // Items are sorted to place DELETE entries last.  This is to ensure that within an operation,
-  // a delete is always processed after a state transition.
-  private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
-      new Comparator<WorkEntry>() {
-        @Override public int compare(WorkEntry a, WorkEntry b) {
-          if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
-            return (a.command == WorkCommand.DELETE) ? 1 : -1;
-          } else {
-            return 0;
-          }
-        }
-      });
-
-  // Adapt the work queue into a sink.
-  private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
-      @Override public void addWork(
-          WorkCommand work,
-          TaskStateMachine stateMachine,
-          Function<IScheduledTask, IScheduledTask> mutation) {
-
-        workQueue.add(new WorkEntry(work, stateMachine, mutation));
-      }
-    };
-
-  private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
-      new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
-        @Override public IScheduledTask apply(Map.Entry<Integer, ITaskConfig> entry) {
-          ITaskConfig task = entry.getValue();
-          AssignedTask assigned = new AssignedTask()
-              .setTaskId(taskIdGenerator.generate(task, entry.getKey()))
-              .setInstanceId(entry.getKey())
-              .setTask(task.newBuilder());
-          return IScheduledTask.build(new ScheduledTask()
-              .setStatus(INIT)
-              .setAssignedTask(assigned));
-        }
-      };
-
-  private final SideEffectStorage storage;
+  private final Storage storage;
   private final Clock clock;
   private final Driver driver;
   private final TaskIdGenerator taskIdGenerator;
+  private final EventSink eventSink;
   private final RescheduleCalculator rescheduleCalculator;
 
-  /**
-   * An item of work on the work queue.
-   */
-  private static class WorkEntry {
-    private final WorkCommand command;
-    private final TaskStateMachine stateMachine;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    WorkEntry(
-        WorkCommand command,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      this.command = command;
-      this.stateMachine = stateMachine;
-      this.mutation = mutation;
-    }
-  }
-
   @Inject
   StateManagerImpl(
       final Storage storage,
@@ -159,21 +97,22 @@ public class StateManagerImpl implements StateManager {
       EventSink eventSink,
       RescheduleCalculator rescheduleCalculator) {
 
-    checkNotNull(storage);
+    this.storage = checkNotNull(storage);
     this.clock = checkNotNull(clock);
-
-    OperationFinalizer finalizer = new OperationFinalizer() {
-      @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
-        processWorkQueueInWriteOperation(work, store);
-      }
-    };
-
-    this.storage = new SideEffectStorage(storage, finalizer, eventSink);
     this.driver = checkNotNull(driver);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
+    this.eventSink = checkNotNull(eventSink);
     this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
+  }
 
-    Stats.exportSize("work_queue_depth", workQueue);
+  private IScheduledTask createTask(int instanceId, ITaskConfig template) {
+    AssignedTask assigned = new AssignedTask()
+        .setTaskId(taskIdGenerator.generate(template, instanceId))
+        .setInstanceId(instanceId)
+        .setTask(template.newBuilder());
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(INIT)
+        .setAssignedTask(assigned));
   }
 
   @Override
@@ -181,15 +120,23 @@ public class StateManagerImpl implements StateManager {
     checkNotNull(tasks);
 
     // Done outside the write transaction to minimize the work done inside a transaction.
-    final Set<IScheduledTask> scheduledTasks =
-        ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
+    final Set<IScheduledTask> scheduledTasks = FluentIterable.from(tasks.entrySet())
+        .transform(new Function<Entry<Integer, ITaskConfig>, IScheduledTask>() {
+          @Override public IScheduledTask apply(Entry<Integer, ITaskConfig> entry) {
+            return createTask(entry.getKey(), entry.getValue());
+          }
+        }).toSet();
 
-    storage.write(storage.new NoResultQuietSideEffectWork() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(MutableStoreProvider storeProvider) {
         storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
 
         for (IScheduledTask task : scheduledTasks) {
-          createStateMachine(task).updateState(PENDING);
+          updateTaskAndExternalState(
+              Tasks.id(task),
+              Optional.of(task),
+              PENDING,
+              Optional.<String>absent());
         }
       }
     });
@@ -202,54 +149,55 @@ public class StateManagerImpl implements StateManager {
       final ScheduleStatus newState,
       final Optional<String> auditMessage) {
 
-    return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
-      @Override
-      public Boolean apply(TaskStateMachine stateMachine) {
-        return stateMachine.updateState(newState, auditMessage);
-      }
-    });
+    return updateTaskAndExternalState(casState, taskId, newState, auditMessage);
   }
 
   @Override
   public IAssignedTask assignTask(
-      String taskId,
-      String slaveHost,
-      SlaveID slaveId,
-      Set<Integer> assignedPorts) {
+      final String taskId,
+      final String slaveHost,
+      final SlaveID slaveId,
+      final Set<Integer> assignedPorts) {
 
     checkNotBlank(taskId);
     checkNotBlank(slaveHost);
+    checkNotNull(slaveId);
     checkNotNull(assignedPorts);
 
-    TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
-    changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
-
-    return mutation.getAssignedTask();
-  }
-
-  private boolean changeState(
-      final String taskId,
-      final Optional<ScheduleStatus> casState,
-      final Function<TaskStateMachine, Boolean> stateChange) {
-
-    return storage.write(storage.new QuietSideEffectWork<Boolean>() {
-      @Override public Boolean apply(MutableStoreProvider storeProvider) {
-        IScheduledTask task = Iterables.getOnlyElement(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
-            null);
-        if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
-          return false;
-        }
+    return storage.write(new MutateWork.Quiet<IAssignedTask>() {
+      @Override public IAssignedTask apply(MutableStoreProvider storeProvider) {
+        boolean success = updateTaskAndExternalState(
+            Optional.<ScheduleStatus>absent(),
+            taskId,
+            ASSIGNED,
+            Optional.<String>absent());
+
+        Preconditions.checkState(
+            success,
+            "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
+        Query.Builder query = Query.taskScoped(taskId);
+        storeProvider.getUnsafeTaskStore().mutateTasks(query,
+            new Function<IScheduledTask, IScheduledTask>() {
+              @Override
+              public IScheduledTask apply(IScheduledTask task) {
+                ScheduledTask builder = task.newBuilder();
+                AssignedTask assigned = builder.getAssignedTask();
+                assigned.setAssignedPorts(
+                    getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+                assigned.setSlaveHost(slaveHost)
+                    .setSlaveId(slaveId.getValue());
+                return IScheduledTask.build(builder);
+              }
+            });
 
-        return stateChange.apply(getStateMachine(taskId, task));
+        return Iterables.getOnlyElement(
+            Iterables.transform(
+                storeProvider.getTaskStore().fetchTasks(query),
+                Tasks.SCHEDULED_TO_ASSIGNED));
       }
     });
   }
 
-  private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
-    IAssignedTask getAssignedTask();
-  }
-
   private static Map<String, Integer> getNameMappedPorts(
       Set<String> portNames,
       Set<Integer> allocatedPorts) {
@@ -275,164 +223,219 @@ public class StateManagerImpl implements StateManager {
     return ports;
   }
 
-  private TaskAssignMutation assignHost(
-      final String slaveHost,
-      final SlaveID slaveId,
-      final Set<Integer> assignedPorts) {
+  @VisibleForTesting
+  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
+      new Supplier<String>() {
+        @Override public String get() {
+          try {
+            return InetAddress.getLocalHost().getHostName();
+          } catch (UnknownHostException e) {
+            LOG.log(Level.SEVERE, "Failed to get self hostname.");
+            throw Throwables.propagate(e);
+          }
+        }
+      });
 
-    final TaskMutation mutation = new TaskMutation() {
-      @Override public IScheduledTask apply(IScheduledTask task) {
-        ScheduledTask builder = task.newBuilder();
-        AssignedTask assigned = builder.getAssignedTask();
-        assigned.setAssignedPorts(
-            getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
-        assigned.setSlaveHost(slaveHost)
-            .setSlaveId(slaveId.getValue());
-        return IScheduledTask.build(builder);
-      }
-    };
+  private boolean updateTaskAndExternalState(
+      final Optional<ScheduleStatus> casState,
+      final String taskId,
+      final ScheduleStatus targetState,
+      final Optional<String> transitionMessage) {
 
-    return new TaskAssignMutation() {
-      private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
-      @Override public IAssignedTask getAssignedTask() {
-        return assignedTask.get();
-      }
+    return storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
+            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+            null));
 
-      @Override public Boolean apply(final TaskStateMachine stateMachine) {
-        TaskMutation wrapper = new TaskMutation() {
-          @Override public IScheduledTask apply(IScheduledTask task) {
-            IScheduledTask mutated = mutation.apply(task);
-            Preconditions.checkState(
-                assignedTask.compareAndSet(null, mutated.getAssignedTask()),
-                "More than one result was found for an identity query.");
-            return mutated;
-          }
-        };
-        return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
+        // CAS operation fails if the task does not exist, or the states don't match.
+        if (casState.isPresent()
+            && (!task.isPresent() || (casState.get() != task.get().getStatus()))) {
+
+          return false;
+        }
+
+        return updateTaskAndExternalState(taskId, task, targetState, transitionMessage);
       }
-    };
+    });
   }
 
-  private void processWorkQueueInWriteOperation(
-      SideEffectWork<?, ?> sideEffectWork,
-      MutableStoreProvider storeProvider) {
-
-    for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
-      final TaskStateMachine stateMachine = work.stateMachine;
-
-      if (work.command == WorkCommand.KILL) {
-        driver.killTask(stateMachine.getTaskId());
-      } else {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        String taskId = stateMachine.getTaskId();
-        Query.Builder idQuery = Query.taskScoped(taskId);
-
-        switch (work.command) {
-          case RESCHEDULE:
-            IScheduledTask ancestor = Iterables.getOnlyElement(taskStore.fetchTasks(idQuery));
-
-            ScheduledTask builder = ancestor.newBuilder();
-            builder.getAssignedTask().unsetSlaveId();
-            builder.getAssignedTask().unsetSlaveHost();
-            builder.getAssignedTask().unsetAssignedPorts();
-            builder.unsetTaskEvents();
-            builder.setAncestorId(taskId);
-            String newTaskId = taskIdGenerator.generate(
-                ITaskConfig.build(builder.getAssignedTask().getTask()),
-                builder.getAssignedTask().getInstanceId());
-            builder.getAssignedTask().setTaskId(newTaskId);
-
-            LOG.info("Task being rescheduled: " + taskId);
-
-            IScheduledTask task = IScheduledTask.build(builder);
-            taskStore.saveTasks(ImmutableSet.of(task));
-
-            ScheduleStatus newState;
-            String auditMessage;
-            long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(ancestor);
-            if (flapPenaltyMs > 0) {
-              newState = THROTTLED;
-              auditMessage =
-                  String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
-            } else {
-              newState = PENDING;
-              auditMessage = "Rescheduled";
-            }
-
-            createStateMachine(task).updateState(newState, Optional.of(auditMessage));
-            break;
-
-          case UPDATE_STATE:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return work.mutation.apply(
-                    IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
-              }
-            });
-            sideEffectWork.addTaskEvent(
-                PubsubEvent.TaskStateChange.transition(
-                    Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
-                    stateMachine.getPreviousState()));
-            break;
-
-          case DELETE:
-            deleteTasks(ImmutableSet.of(taskId));
-            break;
-
-          case INCREMENT_FAILURES:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return IScheduledTask.build(
-                    task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+  private static final Function<SideEffect, Action> GET_ACTION =
+      new Function<SideEffect, Action>() {
+        @Override public Action apply(SideEffect sideEffect) {
+          return sideEffect.getAction();
+        }
+      };
+
+  private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
+      Action.INCREMENT_FAILURES,
+      Action.SAVE_STATE,
+      Action.STATE_CHANGE,
+      Action.RESCHEDULE,
+      Action.KILL,
+      Action.DELETE);
+
+  static {
+    // Sanity check to ensure no actions are missing.
+    Preconditions.checkState(
+        ImmutableSet.copyOf(ACTIONS_IN_ORDER).equals(ImmutableSet.copyOf(Action.values())),
+        "Not all actions are included in ordering.");
+  }
+
+  // Actions are deliberately ordered to prevent things like deleting a task before rescheduling it
+  // (thus losing the object to copy), or rescheduling a task before incrementing the failure count
+  // (thus not carrying forward the failure increment).
+  private static final Ordering<SideEffect> ACTION_ORDER =
+      Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
+
+  private boolean updateTaskAndExternalState(
+      final String taskId,
+      // Note: This argument is deliberately non-final, and should not be made final.
+      // This is because using the captured value within the storage operation below is
+      // highly-risky, since it doesn't necessarily represent the value in storage.
+      // As a result, it would be easy to accidentally clobber mutations.
+      Optional<IScheduledTask> task,
+      final ScheduleStatus targetState,
+      final Optional<String> transitionMessage) {
+
+    if (task.isPresent()) {
+      Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId()));
+    }
+
+    final List<PubsubEvent> events = Lists.newArrayList();
+
+    final TaskStateMachine stateMachine = task.isPresent()
+        ? new TaskStateMachine(task.get())
+        : new TaskStateMachine(taskId);
+
+    boolean success = storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        TransitionResult result = stateMachine.updateState(targetState);
+        Query.Builder query = Query.taskScoped(taskId);
+
+        for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
+          Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
+              Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query), null));
+
+          switch (sideEffect.getAction()) {
+            case INCREMENT_FAILURES:
+              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+                @Override public IScheduledTask apply(IScheduledTask task) {
+                  return IScheduledTask.build(
+                      task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+                }
+              });
+              break;
+
+            case SAVE_STATE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+
+              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+                @Override public IScheduledTask apply(IScheduledTask task) {
+                  ScheduledTask mutableTask = task.newBuilder();
+                  mutableTask.setStatus(stateMachine.getState());
+                  mutableTask.addToTaskEvents(new TaskEvent()
+                      .setTimestamp(clock.nowMillis())
+                      .setStatus(targetState)
+                      .setMessage(transitionMessage.orNull())
+                      .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+                  return IScheduledTask.build(mutableTask);
+                }
+              });
+              events.add(
+                  PubsubEvent.TaskStateChange.transition(
+                      Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query)),
+                      stateMachine.getPreviousState()));
+              break;
+
+            case STATE_CHANGE:
+              updateTaskAndExternalState(
+                  Optional.<ScheduleStatus>absent(),
+                  taskId,
+                  sideEffect.getNextState().get(),
+                  Optional.<String>absent());
+              break;
+
+            case RESCHEDULE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+              LOG.info("Task being rescheduled: " + taskId);
+
+              ScheduleStatus newState;
+              String auditMessage;
+              long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(upToDateTask.get());
+              if (flapPenaltyMs > 0) {
+                newState = THROTTLED;
+                auditMessage =
+                    String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
+              } else {
+                newState = PENDING;
+                auditMessage = "Rescheduled";
               }
-            });
-            break;
 
-          default:
-            LOG.severe("Unrecognized work command type " + work.command);
+              IScheduledTask newTask = IScheduledTask.build(createTask(
+                  upToDateTask.get().getAssignedTask().getInstanceId(),
+                  upToDateTask.get().getAssignedTask().getTask())
+                  .newBuilder()
+                  .setFailureCount(upToDateTask.get().getFailureCount())
+                  .setAncestorId(taskId));
+              storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(newTask));
+              updateTaskAndExternalState(
+                  Tasks.id(newTask),
+                  Optional.of(newTask),
+                  newState,
+                  Optional.of(auditMessage));
+              break;
+
+            case KILL:
+              driver.killTask(taskId);
+              break;
+
+            case DELETE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+
+              events.add(deleteTasks(storeProvider, ImmutableSet.of(taskId)));
+              break;
+
+            default:
+              throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
+          }
         }
+
+        return result.isSuccess();
       }
+    });
+
+    // Note (AURORA-138): Delaying events until after the write operation is somewhat futile, since
+    // the state may actually not be written to durable store
+    // (e.g. if this is a nested transaction). Ideally, Storage would add a facility to attach
+    // side-effects that are performed after the outer-most transaction completes (meaning state
+    // has been durably persisted).
+    for (PubsubEvent event : events) {
+      eventSink.post(event);
     }
+
+    return success;
   }
 
   @Override
   public void deleteTasks(final Set<String> taskIds) {
-    storage.write(storage.new NoResultQuietSideEffectWork() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(final MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
-        addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
-        taskStore.deleteTasks(taskIds);
+        eventSink.post(deleteTasks(storeProvider, taskIds));
       }
     });
   }
 
-  private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
-    if (task != null) {
-      return createStateMachine(task, task.getStatus());
-    }
-
-    // The task is unknown, not present in storage.
-    TaskStateMachine stateMachine = new TaskStateMachine(
-        taskId,
-        null,
-        workSink,
-        clock,
-        INIT);
-    stateMachine.updateState(UNKNOWN);
-    return stateMachine;
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task) {
-    return createStateMachine(task, INIT);
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
-    return new TaskStateMachine(
-        Tasks.id(task),
-        task,
-        workSink,
-        clock,
-        initialState);
+  private static PubsubEvent deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds) {
+    TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+    Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
+    taskStore.deleteTasks(taskIds);
+    return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index 11d283d..ebccc74 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -15,39 +15,54 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import com.twitter.common.base.Closure;
 import com.twitter.common.base.Closures;
 import com.twitter.common.base.Command;
 import com.twitter.common.base.MorePreconditions;
 import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
 import com.twitter.common.util.StateMachine;
 import com.twitter.common.util.StateMachine.Rule;
 import com.twitter.common.util.StateMachine.Transition;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.commons.lang.builder.HashCodeBuilder;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
+import static org.apache.aurora.scheduler.state.SideEffect.Action;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.DELETE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAILURES;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.STATE_CHANGE;
+
 /**
  * State machine for a task.
  * <p>
@@ -55,8 +70,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * to different state transitions.  These responses are externally communicated by populating a
  * provided work queue.
  * <p>
- * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
- *     abstractly from the consumption side.
+ * TODO(wfarner): Augment this class to force the one-time-use nature.  This is probably best done
+ * by hiding the constructor and exposing only a static function to transition a task and get the
+ * resulting actions.
  */
 class TaskStateMachine {
   private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
@@ -64,184 +80,104 @@ class TaskStateMachine {
   private static final AtomicLong ILLEGAL_TRANSITIONS =
       Stats.exportLong("scheduler_illegal_task_state_transitions");
 
-  // Re-declarations of statuses as wrapped state objects.
-  private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
-  private static final State FAILED = State.create(ScheduleStatus.FAILED);
-  private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
-  private static final State INIT = State.create(ScheduleStatus.INIT);
-  private static final State KILLED = State.create(ScheduleStatus.KILLED);
-  private static final State KILLING = State.create(ScheduleStatus.KILLING);
-  private static final State LOST = State.create(ScheduleStatus.LOST);
-  private static final State PENDING = State.create(ScheduleStatus.PENDING);
-  private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
-  private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
-  private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
-  private static final State STARTING = State.create(ScheduleStatus.STARTING);
-  private static final State THROTTLED = State.create(ScheduleStatus.THROTTLED);
-  private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
-
-  @VisibleForTesting
-  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
-      new Supplier<String>() {
-        @Override public String get() {
-          try {
-            return InetAddress.getLocalHost().getHostName();
-          } catch (UnknownHostException e) {
-            LOG.log(Level.SEVERE, "Failed to get self hostname.");
-            throw Throwables.propagate(e);
-          }
-        }
-      });
-
-  private final String taskId;
-  private final WorkSink workSink;
-  private final StateMachine<State> stateMachine;
+  private final StateMachine<ScheduleStatus> stateMachine;
   private ScheduleStatus previousState = null;
-  private final Clock clock;
-
-  /**
-   * Composes a schedule status and a state change argument.  Only the ScheduleStatuses in two
-   * States must be equal for them to be considered equal.
-   */
-  private static class State {
-    private final ScheduleStatus state;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
-      this.state = state;
-      this.mutation = mutation;
-    }
-
-    static State create(ScheduleStatus status) {
-      return create(status, Functions.<IScheduledTask>identity());
-    }
-
-    static State create(
-        ScheduleStatus status,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      return new State(status, mutation);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof State)) {
-        return false;
-      }
-
-      if (o == this) {
-        return true;
-      }
-
-      State other = (State) o;
-      return state == other.state;
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder()
-          .append(state)
-          .toHashCode();
-    }
-
-    @Override
-    public String toString() {
-      return state.toString();
-    }
 
-    private ScheduleStatus getState() {
-      return state;
-    }
-
-    private Function<IScheduledTask, IScheduledTask> getMutation() {
-      return mutation;
-    }
-  }
+  private final Set<SideEffect> sideEffects = Sets.newHashSet();
 
   /**
-   * A write-only work acceptor.
+   * Creates a new task state machine representing a non-existent task.  This allows for consistent
+   * state-reconciliation actions when the external system disagrees with the scheduler.
+   *
+   * @param name Name of the state machine, for logging.
    */
-  public interface WorkSink {
-    /**
-     * Appends external work that must be performed for a state machine transition to be fully
-     * complete.
-     *
-     * @param work Description of the work to be performed.
-     * @param stateMachine The state machine that the work is associated with.
-     * @param mutation Mutate operation to perform along with the state transition.
-     */
-    void addWork(
-        WorkCommand work,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation);
+  public TaskStateMachine(String name) {
+    this(name, Optional.<IScheduledTask>absent());
   }
 
   /**
-   * Creates a new task state machine.
-   *
-   * @param taskId ID of the task managed by this state machine.
+   * Creates a new task state machine representing an existent task.  The state machine will be
+   * named with the tasks ID.
+   *.
    * @param task Read-only task that this state machine manages.
-   * @param workSink Work sink to receive transition response actions
-   * @param clock Clock to use for reading the current time.
-   * @param initialState The state to begin the state machine at.  All legal transitions will be
-   *     added, but this allows the state machine to 'skip' states, for instance when a task is
-   *     loaded from a persistent store.
    */
-  public TaskStateMachine(
-      final String taskId,
-      final IScheduledTask task,
-      final WorkSink workSink,
-      final Clock clock,
-      final ScheduleStatus initialState) {
-
-    this.taskId = MorePreconditions.checkNotBlank(taskId);
-    this.workSink = checkNotNull(workSink);
-    this.clock = checkNotNull(clock);
-    checkNotNull(initialState);
-
-    @SuppressWarnings("unchecked")
-    Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
-        /* Kill a task that we believe to be terminated when an attempt is made to revive. */
-        Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
-            addWorkClosure(WorkCommand.KILL)),
-        /* Remove a terminated task that is remotely removed. */
-        Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
-
-    final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
-      @SuppressWarnings("fallthrough")
-      @Override public void execute(Transition<State> transition) {
-        switch (transition.getTo().getState()) {
-          case ASSIGNED:
-          case STARTING:
-          case RUNNING:
-            addWork(WorkCommand.KILL);
-            break;
-
-          case LOST:
-            addWork(WorkCommand.KILL);
-            // fall through
-
-          case FINISHED:
-          case FAILED:
-          case KILLED:
-            addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
-            break;
-
-          case UNKNOWN:
-            updateState(ScheduleStatus.LOST);
-            break;
-
-          default:
-            // No-op.
-        }
-      }
-    };
+  public TaskStateMachine(IScheduledTask task) {
+    this(Tasks.id(task), Optional.of(task));
+  }
+
+  private TaskStateMachine(final String name, final Optional<IScheduledTask> task) {
+    MorePreconditions.checkNotBlank(name);
+    checkNotNull(task);
+
+    final ScheduleStatus initialState = task.transform(Tasks.GET_STATUS).or(UNKNOWN);
+    if (task.isPresent()) {
+      Preconditions.checkState(
+          initialState != UNKNOWN,
+          "A task that exists may not be in UNKNOWN state.");
+    } else {
+      Preconditions.checkState(
+          initialState == UNKNOWN,
+          "A task that does not exist must start un UNKNOWN state.");
+    }
+
+    Closure<Transition<ScheduleStatus>> manageTerminatedTasks = Closures.combine(
+        ImmutableList.<Closure<Transition<ScheduleStatus>>>builder()
+            // Kill a task that we believe to be terminated when an attempt is made to revive.
+            .add(
+                Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
+                addFollowupClosure(KILL)))
+            // Remove a terminated task that is remotely removed.
+            .add(Closures.filter(Transition.to(UNKNOWN), addFollowupClosure(DELETE)))
+            .build());
+
+    final Closure<Transition<ScheduleStatus>> manageRestartingTask =
+        new Closure<Transition<ScheduleStatus>>() {
+          @Override public void execute(Transition<ScheduleStatus> transition) {
+            switch (transition.getTo()) {
+              case ASSIGNED:
+                addFollowup(KILL);
+                break;
+
+              case STARTING:
+                addFollowup(KILL);
+                break;
+
+              case RUNNING:
+                addFollowup(KILL);
+                break;
+
+              case LOST:
+                addFollowup(KILL);
+                addFollowup(RESCHEDULE);
+                break;
+
+              case FINISHED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case FAILED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case KILLED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case UNKNOWN:
+                addFollowupTransition(LOST);
+                break;
+
+              default:
+                // No-op.
+            }
+          }
+        };
 
     // To be called on a task transitioning into the FINISHED state.
     final Command rescheduleIfService = new Command() {
       @Override public void execute() {
-        if (task.getAssignedTask().getTask().isIsService()) {
-          addWork(WorkCommand.RESCHEDULE);
+        if (task.get().getAssignedTask().getTask().isIsService()) {
+          addFollowup(RESCHEDULE);
         }
       }
     };
@@ -249,27 +185,29 @@ class TaskStateMachine {
     // To be called on a task transitioning into the FAILED state.
     final Command incrementFailuresMaybeReschedule = new Command() {
       @Override public void execute() {
-        addWork(WorkCommand.INCREMENT_FAILURES);
+        addFollowup(INCREMENT_FAILURES);
 
         // Max failures is ignored for service task.
-        boolean isService = task.getAssignedTask().getTask().isIsService();
+        boolean isService = task.get().getAssignedTask().getTask().isIsService();
 
         // Max failures is ignored when set to -1.
-        int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
-        if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
-          addWork(WorkCommand.RESCHEDULE);
+        int maxFailures = task.get().getAssignedTask().getTask().getMaxTaskFailures();
+        boolean belowMaxFailures =
+            (maxFailures == -1) || (task.get().getFailureCount() < (maxFailures - 1));
+        if (isService || belowMaxFailures) {
+          addFollowup(RESCHEDULE);
         } else {
-          LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
+          LOG.info("Task " + name + " reached failure limit, not rescheduling");
         }
       }
     };
 
-    final Closure<Transition<State>> deleteIfKilling =
-        Closures.filter(Transition.to(KILLING), addWorkClosure(WorkCommand.DELETE));
+    final Closure<Transition<ScheduleStatus>> deleteIfKilling =
+        Closures.filter(Transition.to(KILLING), addFollowupClosure(DELETE));
 
-    stateMachine = StateMachine.<State>builder(taskId)
+    stateMachine = StateMachine.<ScheduleStatus>builder(name)
         .logTransitions()
-        .initialState(State.create(initialState))
+        .initialState(initialState)
         .addState(
             Rule.from(INIT)
                 .to(PENDING, THROTTLED, UNKNOWN))
@@ -286,16 +224,15 @@ class TaskStateMachine {
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
                     KILLING, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -303,25 +240,24 @@ class TaskStateMachine {
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
-                            // fall through
-                          case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(RESCHEDULE);
+                            addFollowup(KILL);
                             break;
 
-                          case UNKNOWN:
+                          case KILLING:
+                            addFollowup(KILL);
                             break;
 
-                           default:
-                             // No-op.
+                          default:
+                            // No-op.
                         }
                       }
                     }
@@ -330,20 +266,19 @@ class TaskStateMachine {
             Rule.from(STARTING)
                 .to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -351,25 +286,25 @@ class TaskStateMachine {
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case UNKNOWN:
                             // The slave previously acknowledged that it had the task, and now
                             // stopped reporting it.
-                            updateState(ScheduleStatus.LOST);
+                            addFollowupTransition(LOST);
                             break;
 
-                           default:
-                             // No-op.
+                          default:
+                            // No-op.
                         }
                       }
                     }
@@ -378,20 +313,19 @@ class TaskStateMachine {
             Rule.from(RUNNING)
                 .to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -399,19 +333,19 @@ class TaskStateMachine {
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case UNKNOWN:
-                            updateState(ScheduleStatus.LOST);
+                            addFollowupTransition(LOST);
                             break;
 
                            default:
@@ -455,23 +389,26 @@ class TaskStateMachine {
         // Since we want this action to be performed last in the transition sequence, the callback
         // must be the last chained transition callback.
         .onAnyTransition(
-            new Closure<Transition<State>>() {
-              @Override public void execute(final Transition<State> transition) {
-                ScheduleStatus from = transition.getFrom().getState();
-                ScheduleStatus to = transition.getTo().getState();
-
-                if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
-                    // Prevent an update when killing a pending task, since the task is deleted
-                    // prior to the update.
-                    && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
-                  addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
-                } else if (!transition.isAllowed()) {
-                  LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
-                  ILLEGAL_TRANSITIONS.incrementAndGet();
-                }
-
+            new Closure<Transition<ScheduleStatus>>() {
+              @Override public void execute(final Transition<ScheduleStatus> transition) {
                 if (transition.isValidStateChange()) {
+                  ScheduleStatus from = transition.getFrom();
+                  ScheduleStatus to = transition.getTo();
+
+                  // TODO(wfarner): Clean up this hack.  This is here to suppress unnecessary work
+                  // (save followed by delete), but it shows a wart with this catch-all behavior.
+                  // Strongly consider pushing the SAVE_STATE behavior to each transition handler.
+                  boolean pendingDeleteHack =
+                      !(((from == PENDING) || (from == THROTTLED)) && (to == KILLING));
+
+                  // Don't bother saving state of a task that is being removed.
+                  if ((to != UNKNOWN) && pendingDeleteHack) {
+                    addFollowup(SAVE_STATE);
+                  }
                   previousState = from;
+                } else {
+                  LOG.severe("Illegal state transition attempted: " + transition);
+                  ILLEGAL_TRANSITIONS.incrementAndGet();
                 }
               }
             }
@@ -485,56 +422,25 @@ class TaskStateMachine {
         .build();
   }
 
-  private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
-    return new Closure<Transition<State>>() {
-      @Override public void execute(Transition<State> item) {
-        addWork(work);
-      }
-    };
-  }
-
-  private void addWork(WorkCommand work) {
-    addWork(work, Functions.<IScheduledTask>identity());
+  private void addFollowup(Action action) {
+    addFollowup(new SideEffect(action, Optional.<ScheduleStatus>absent()));
   }
 
-  private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
-    LOG.info("Adding work command " + work + " for " + this);
-    workSink.addWork(work, TaskStateMachine.this, mutation);
+  private void addFollowupTransition(ScheduleStatus status) {
+    addFollowup(new SideEffect(STATE_CHANGE, Optional.of(status)));
   }
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status) {
-    return updateState(status, Functions.<IScheduledTask>identity());
+  private void addFollowup(SideEffect sideEffect) {
+    LOG.info("Adding work command " + sideEffect + " for " + this);
+    sideEffects.add(sideEffect);
   }
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @param auditMessage The (optional) audit message to associate with the transition.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
-    return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
-  }
-
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
-   *
-   * @param status Status to apply to the task.
-   * @param mutation Mutate operation to perform while updating the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(
-      ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation) {
-
-    return updateState(status, mutation, Optional.<String>absent());
+  private Closure<Transition<ScheduleStatus>> addFollowupClosure(final Action action) {
+    return new Closure<Transition<ScheduleStatus>>() {
+      @Override public void execute(Transition<ScheduleStatus> item) {
+        addFollowup(action);
+      }
+    };
   }
 
   /**
@@ -543,59 +449,35 @@ class TaskStateMachine {
    * will be appended to the work queue.
    *
    * @param status Status to apply to the task.
-   * @param auditMessage The audit message to associate with the transition.
-   * @param mutation Mutate operation to perform while updating the task.
    * @return {@code true} if the state change was allowed, {@code false} otherwise.
    */
-  private synchronized boolean updateState(
-      final ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation,
-      final Optional<String> auditMessage) {
-
+  public synchronized TransitionResult updateState(final ScheduleStatus status) {
     checkNotNull(status);
-    checkNotNull(mutation);
-    checkNotNull(auditMessage);
+    Preconditions.checkState(sideEffects.isEmpty());
 
     /**
      * Don't bother applying noop state changes.  If we end up modifying task state without a
      * state transition (e.g. storing resource consumption of a running task), we need to find
      * a different way to suppress noop transitions.
      */
-    if (stateMachine.getState().getState() != status) {
-      Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
-          new Function<IScheduledTask, IScheduledTask>() {
-            @Override public IScheduledTask apply(IScheduledTask task) {
-              ScheduledTask builder = task.newBuilder();
-              builder.addToTaskEvents(new TaskEvent()
-                  .setTimestamp(clock.nowMillis())
-                  .setStatus(status)
-                  .setMessage(auditMessage.orNull())
-                  .setScheduler(LOCAL_HOST_SUPPLIER.get()));
-              return IScheduledTask.build(builder);
-            }
-          });
-      return stateMachine.transition(State.create(status, operation));
+    if (stateMachine.getState() == status) {
+      return new TransitionResult(false, ImmutableSet.<SideEffect>of());
     }
 
-    return false;
+    boolean success = stateMachine.transition(status);
+    ImmutableSet<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects);
+    sideEffects.clear();
+    return new TransitionResult(success, transitionEffects);
   }
 
   /**
    * Fetch the current state from the state machine.
+   * TODO(wfarner): Consider removing, the caller should know this.
    *
    * @return The current state.
    */
   public synchronized ScheduleStatus getState() {
-    return stateMachine.getState().getState();
-  }
-
-  /**
-   * Gets the ID for the task that this state machine manages.
-   *
-   * @return The state machine's task ID.
-   */
-  public String getTaskId() {
-    return taskId;
+    return stateMachine.getState();
   }
 
   /**
@@ -611,6 +493,6 @@ class TaskStateMachine {
 
   @Override
   public String toString() {
-    return getTaskId();
+    return stateMachine.getName();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/TransitionResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TransitionResult.java b/src/main/java/org/apache/aurora/scheduler/state/TransitionResult.java
new file mode 100644
index 0000000..15174bd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/TransitionResult.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * The actions that should be performed in response to a state transition attempt.
+ *
+ * {@see TaskStateMachine}
+ */
+public class TransitionResult {
+  private final boolean success;
+  private final ImmutableSet<SideEffect> sideEffects;
+
+  /**
+   * Creates a transition result with the given side effects.
+   *
+   * @param success Whether the transition attempt relevant to this result was successful.
+   * @param sideEffects Actions that must be performed in response to the state transition.
+   */
+  public TransitionResult(boolean success, ImmutableSet<SideEffect> sideEffects) {
+    this.success = success;
+    this.sideEffects = Preconditions.checkNotNull(sideEffects);
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public ImmutableSet<SideEffect> getSideEffects() {
+    return sideEffects;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof TransitionResult)) {
+      return false;
+    }
+
+    TransitionResult other = (TransitionResult) o;
+    return (success == other.success)
+        && Objects.equal(sideEffects, other.sideEffects);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(success, sideEffects);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("success", success)
+        .add("sideEffects", sideEffects)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java b/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
deleted file mode 100644
index aff74d5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-/**
- * Descriptions of the different types of external work commands that task state machines may
- * trigger.
- */
-enum WorkCommand {
-  // Send an instruction for the runner of this task to kill the task.
-  KILL,
-  // Create a new state machine with a copy of this task.
-  RESCHEDULE,
-  // Update the task's state (schedule status) in the persistent store to match the state machine.
-  UPDATE_STATE,
-  // Delete this task from the persistent store.
-  DELETE,
-  // Increment the failure count for this task.
-  INCREMENT_FAILURES
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 53d0c85..984f506 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -198,6 +198,10 @@ public interface Storage {
 
   /**
    * Executes the unit of mutating {@code work}.
+   * TODO(wfarner): Add a mechanism by which mutating work can add side-effect operations to be
+   * performed after completion of the outer-most transaction.  As it stands, it's somewhat
+   * futile to try to achieve this within a transaction, since the local code does not know
+   * if the current transaction is nested.
    *
    * @param work The unit of work to execute.
    * @param <T> The type of result this unit of work produces.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 7fe297a..3d0ff2d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -71,6 +71,8 @@ public interface TaskStore {
     /**
      * Offers temporary mutable access to tasks.  If a task ID is not found, it will be silently
      * skipped, and no corresponding task will be returned.
+     * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case,
+     * and it prevents the caller from worrying about a bad query having broad impact.
      *
      * @param query Query to match tasks against.
      * @param mutator The mutate operation.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3870df32/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 5379300..a0525e5 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -51,7 +52,6 @@ import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IArgumentMatcher;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -59,6 +59,7 @@ import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
@@ -67,6 +68,7 @@ import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class StateManagerImplTest extends EasyMockTest {
@@ -100,11 +102,6 @@ public class StateManagerImplTest extends EasyMockTest {
         rescheduleCalculator);
   }
 
-  @After
-  public void validateCompletion() {
-    assertTrue(stateManager.getStorage().getEvents().isEmpty());
-  }
-
   private static class StateChangeMatcher implements IArgumentMatcher {
     private final String taskId;
     private final ScheduleStatus from;
@@ -181,7 +178,7 @@ public class StateManagerImplTest extends EasyMockTest {
         .setStatus(PENDING)
         .setTaskEvents(ImmutableList.of(new TaskEvent()
             .setTimestamp(clock.nowMillis())
-            .setScheduler(TaskStateMachine.LOCAL_HOST_SUPPLIER.get())
+            .setScheduler(StateManagerImpl.LOCAL_HOST_SUPPLIER.get())
             .setStatus(PENDING)))
         .setAssignedTask(new AssignedTask()
             .setInstanceId(3)
@@ -294,6 +291,92 @@ public class StateManagerImplTest extends EasyMockTest {
     changeState(unknownTask, RUNNING);
   }
 
+  private void noFlappingPenalty() {
+    expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+        .andReturn(0L);
+  }
+
+  @Test
+  public void testIncrementFailureCount() {
+    ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+    noFlappingPenalty();
+    expectStateTransitions(taskId2, INIT, PENDING);
+
+    control.replay();
+
+    insertTask(task, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, FAILED);
+    IScheduledTask rescheduledTask = Iterables.getOnlyElement(
+        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId2)));
+    assertEquals(1, rescheduledTask.getFailureCount());
+  }
+
+  @Test
+  public void testDoubleTransition() {
+    // Tests that a transition inducing another transition (STATE_CHANGE action) is performed.
+
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, LOST);
+
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+    noFlappingPenalty();
+    expectStateTransitions(taskId2, INIT, PENDING);
+
+    control.replay();
+
+    insertTask(task, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, UNKNOWN);
+  }
+
+  @Test
+  public void testCasTaskPresent() {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, FAILED);
+
+    control.replay();
+
+    insertTask(task, 0);
+    assignTask(taskId, HOST_A);
+    assertFalse(stateManager.changeState(
+        taskId,
+        Optional.of(PENDING),
+        RUNNING,
+        Optional.<String>absent()));
+    assertTrue(stateManager.changeState(
+        taskId,
+        Optional.of(ASSIGNED),
+        FAILED,
+        Optional.<String>absent()));
+  }
+
+  @Test
+  public void testCasTaskNotFound() {
+    control.replay();
+
+    assertFalse(stateManager.changeState(
+        "a",
+        Optional.of(PENDING),
+        ASSIGNED,
+        Optional.<String>absent()));
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,