You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/08/06 21:33:16 UTC

git commit: Refactor InstanceUpdater to remove the callback interface.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 7f21e3f41 -> dd91370f8


Refactor InstanceUpdater to remove the callback interface.

Reviewed at https://reviews.apache.org/r/24357/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/dd91370f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/dd91370f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/dd91370f

Branch: refs/heads/master
Commit: dd91370f8e4d5093a2b01ae5294853b400dcec78
Parents: 7f21e3f
Author: Bill Farner <wf...@apache.org>
Authored: Wed Aug 6 12:29:36 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Aug 6 12:29:36 2014 -0700

----------------------------------------------------------------------
 .../scheduler/updater/InstanceUpdater.java      |  90 ++---
 .../scheduler/updater/TaskController.java       |  54 ---
 .../scheduler/updater/InstanceUpdaterTest.java  | 390 ++++++++-----------
 .../aurora/scheduler/updater/TaskUtil.java      |  64 +++
 4 files changed, 260 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/dd91370f/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
index 2863517..7476d82 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -33,8 +33,12 @@ import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_ON_STATE_CHANGE;
 import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.FAILED;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCESS;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEEDED;
 
 /**
  * In part of a job update, this manages the update of an individual instance. This includes
@@ -52,14 +56,6 @@ class InstanceUpdater {
   private final Amount<Long, Time> maxNonRunningTime;
   private final Clock clock;
 
-  /**
-   * Keep an optional controller reference so that we may discard the reference after we've
-   * advertised completion through {@link TaskController#updateCompleted(Result) updateCompleted}.
-   * This gives us a signal to no-op (when this is reset to absent), and ensures we can't send any
-   * control signals from that point on.
-   */
-  private Optional<TaskController> controllerRef = Optional.absent();
-
   private int observedFailures = 0;
 
   InstanceUpdater(
@@ -67,15 +63,13 @@ class InstanceUpdater {
       int toleratedFailures,
       Amount<Long, Time> minRunningTime,
       Amount<Long, Time> maxNonRunningTime,
-      Clock clock,
-      final TaskController controller) {
+      Clock clock) {
 
     this.desiredState = requireNonNull(desiredState);
     this.toleratedFailures = toleratedFailures;
     this.minRunningTime = requireNonNull(minRunningTime);
     this.maxNonRunningTime = requireNonNull(maxNonRunningTime);
     this.clock = requireNonNull(clock);
-    this.controllerRef = Optional.of(controller);
   }
 
   private long millisSince(ITaskEvent event) {
@@ -110,72 +104,55 @@ class InstanceUpdater {
     return Tasks.isActive(status) && status != ScheduleStatus.KILLING;
   }
 
-  private void completed(Result status) {
-    controllerRef.get().updateCompleted(status);
-    controllerRef = Optional.absent();
-  }
-
   /**
    * Evaluates the state differences between the originally-provided {@code desiredState} and the
-   * provided {@code actualState}, and invokes any necessary actions on the provided
-   * {@link TaskController}.
+   * provided {@code actualState}.
    * <p>
    * This function should be idempotent, with the exception of an internal failure counter that
    * increments when an updating task exits, or an active but not
    * {@link ScheduleStatus#RUNNING RUNNING} task takes too long to start.
    *
    * <p>
-   * It is the reponsibility of the caller to ensure that the {@code actualState} is the latest
+   * It is the responsibility of the caller to ensure that the {@code actualState} is the latest
    * value.  Note: the caller should avoid calling this when a terminal task is moving to another
    * terminal state.  It should also suppress deletion events for tasks that have been replaced by
    * an active task.
    *
    * @param actualState The actual observed state of the task.
+   * @return the evaluation result, including the state of the instance update, and a necessary
+   *         action to perform.
    */
-  synchronized void evaluate(Optional<IScheduledTask> actualState) {
-    if (!controllerRef.isPresent()) {
-      // Avoid any further action if a result was already given.
-      return;
-    }
-
-    TaskController controller = controllerRef.get();
-
+  synchronized Result evaluate(Optional<IScheduledTask> actualState) {
     boolean desiredPresent = desiredState.isPresent();
     boolean actualPresent = actualState.isPresent();
 
     if (desiredPresent && actualPresent) {
       // The update is changing the task configuration.
-      handleActualAndDesiredPresent(actualState.get());
+      return handleActualAndDesiredPresent(actualState.get());
     } else if (desiredPresent) {
       // The update is introducing a new instance.
-      controller.addReplacement();
+      return REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
     } else if (actualPresent) {
       // The update is removing an instance.
-      if (isKillable(actualState.get().getStatus())) {
-        controller.killTask();
-      }
+      return isKillable(actualState.get().getStatus())
+          ? KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE
+          : EVALUATE_ON_STATE_CHANGE;
     } else {
       // No-op update.
-      completed(SUCCESS);
+      return SUCCEEDED;
     }
   }
 
-  private boolean addFailureAndCheckIfFailed() {
+  private Result addFailureAndCheckIfFailed() {
     LOG.info("Observed updated task failure.");
     observedFailures++;
-    if (observedFailures > toleratedFailures) {
-      completed(FAILED);
-      return true;
-    }
-    return false;
+    return observedFailures > toleratedFailures ? FAILED : EVALUATE_ON_STATE_CHANGE;
   }
 
-  private void handleActualAndDesiredPresent(IScheduledTask actualState) {
+  private Result handleActualAndDesiredPresent(IScheduledTask actualState) {
     Preconditions.checkState(desiredState.isPresent());
     Preconditions.checkArgument(!actualState.getTaskEvents().isEmpty());
 
-    TaskController controller = controllerRef.get();
-
     ScheduleStatus status = actualState.getStatus();
     if (desiredState.get().equals(actualState.getAssignedTask().getTask())) {
       // The desired task is in the system.
@@ -183,38 +160,45 @@ class InstanceUpdater {
         // The desired task is running.
         if (appearsStable(actualState)) {
           // Stably running, our work here is done.
-          completed(SUCCESS);
+          return SUCCEEDED;
         } else {
           // Not running long enough to consider stable, check again later.
-          controller.reevaluteAfterRunningLimit();
+          return EVALUATE_AFTER_RUNNING_LIMIT;
         }
       } else if (Tasks.isTerminated(status)) {
         // The desired task has terminated, this is a failure.
-        addFailureAndCheckIfFailed();
+        return addFailureAndCheckIfFailed();
       } else if (appearsStuck(actualState)) {
         // The task is not running, but not terminated, and appears to have been in this state
         // long enough that we should intervene.
-        if (!addFailureAndCheckIfFailed()) {
-          controller.killTask();
-        }
+        Result updaterStatus = addFailureAndCheckIfFailed();
+        return (updaterStatus == FAILED)
+            ? updaterStatus
+            : KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
       } else {
         // The task is in a transient state on the way into or out of running, check back later.
-        controller.reevaluteAfterRunningLimit();
+        return EVALUATE_AFTER_RUNNING_LIMIT;
       }
     } else {
       // This is not the configuration that we would like to run.
       if (isKillable(status)) {
         // Task is active, kill it.
-        controller.killTask();
+        return Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
       } else if (Tasks.isTerminated(status) && permanentlyKilled(actualState)) {
         // The old task has exited, it is now safe to add the new one.
-        controller.addReplacement();
+        return Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
       }
     }
+
+    return EVALUATE_ON_STATE_CHANGE;
   }
 
   enum Result {
-    SUCCESS,
+    EVALUATE_ON_STATE_CHANGE,
+    REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+    KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+    EVALUATE_AFTER_RUNNING_LIMIT,
+    SUCCEEDED,
     FAILED
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/dd91370f/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java b/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
deleted file mode 100644
index b725a31..0000000
--- a/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.updater;
-
-/**
- * Controls that an instance updater uses to modify the job instance it is updating, or advertise
- * that it has reached a result.
- */
-interface TaskController {
-  /**
-   * Terminates the instance, ensuring that it is not automatically restarted.
-   * <p>
-   * If the implementation wishes to time out on the kill attempt, it should schedule an action
-   * to invoke {@link InstanceUpdater#evaluate(com.google.common.base.Optional)} after a delay.
-   */
-  void killTask();
-
-  /**
-   * Adds a task with the new configuration to replace the old task.
-   * <p>
-   * If the implementation wishes to time out on the task replacement (such as to deal with a
-   * task stuck in {@link org.apache.aurora.gen.ScheduleStatus#PENDING}, it should schedule an
-   * action to invoke {@link InstanceUpdater#evaluate(com.google.common.base.Optional)} after a
-   * delay.
-   */
-  void addReplacement();
-
-  /**
-   * Requests that the updater be re-evaluated after the amount of time a task must remain in
-   * {@link org.apache.aurora.gen.ScheduleStatus#RUNNING} to be considered stable. The re-evaluation
-   * should be after the same amount of time as {@code minRunningTime}.
-   */
-  void reevaluteAfterRunningLimit();
-
-  /**
-   * Announces a result of the attempt to update the instance.
-   * <p>
-   * Once this callback has been made, the updater will no-op on any subsequent evaluations.
-   *
-   * @param status Update result.
-   */
-  void updateCompleted(InstanceUpdater.Result status);
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/dd91370f/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
index a845363..dda1b73 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
@@ -13,25 +13,18 @@
  */
 package org.apache.aurora.scheduler.updater;
 
-import java.util.List;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
-import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
@@ -42,12 +35,15 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCESS;
-import static org.easymock.EasyMock.expectLastCall;
-
-public class InstanceUpdaterTest extends EasyMockTest {
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEEDED;
+import static org.junit.Assert.assertEquals;
+
+public class InstanceUpdaterTest {
   private static final Optional<ITaskConfig> NO_CONFIG = Optional.absent();
-  private static final Optional<IScheduledTask> NO_TASK = Optional.absent();
 
   private static final ITaskConfig OLD = ITaskConfig.build(new TaskConfig().setNumCpus(1.0));
   private static final ITaskConfig NEW = ITaskConfig.build(new TaskConfig().setNumCpus(2.0));
@@ -55,250 +51,191 @@ public class InstanceUpdaterTest extends EasyMockTest {
   private static final Amount<Long, Time> MIN_RUNNING_TIME = Amount.of(1L, Time.MINUTES);
   private static final Amount<Long, Time> MAX_NON_RUNNING_TIME = Amount.of(5L, Time.MINUTES);
 
-  private FakeClock clock;
-  private TaskController controller;
-  private InstanceUpdater updater;
-
-  @Before
-  public void setUp() {
-    clock = new FakeClock();
-    controller = createMock(TaskController.class);
-  }
+  private static class TestFixture {
+    private final FakeClock clock;
+    private final InstanceUpdater updater;
+    private final TaskUtil taskUtil;
+    private Optional<IScheduledTask> task = Optional.absent();
+
+    TestFixture(Optional<ITaskConfig> newConfig, int maxToleratedFailures) {
+      this.clock = new FakeClock();
+      this.updater = new InstanceUpdater(
+          newConfig,
+          maxToleratedFailures,
+          MIN_RUNNING_TIME,
+          MAX_NON_RUNNING_TIME,
+          clock);
+      this.taskUtil = new TaskUtil(clock);
+    }
 
-  private void newUpdater(Optional<ITaskConfig> newConfig, int maxToleratedFailures) {
-    updater = new InstanceUpdater(
-        newConfig,
-        maxToleratedFailures,
-        MIN_RUNNING_TIME,
-        MAX_NON_RUNNING_TIME,
-        clock,
-        controller);
-  }
+    TestFixture(ITaskConfig newConfig, int maxToleratedFailures) {
+      this(Optional.of(newConfig), maxToleratedFailures);
+    }
 
-  private void newUpdater(ITaskConfig newConfig, int maxToleratedFailures) {
-    newUpdater(Optional.of(newConfig), maxToleratedFailures);
-  }
+    void setActualState(ITaskConfig config) {
+      this.task = Optional.of(taskUtil.makeTask(config, PENDING));
+    }
 
-  private IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) {
-    List<TaskEvent> events = Lists.newArrayList();
-    if (status != PENDING) {
-      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING));
+    void setActualStateAbsent() {
+      this.task = Optional.absent();
     }
-    if (Tasks.isTerminated(status) || status == KILLING) {
-      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED));
-      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING));
+
+    private Result changeStatusAndEvaluate(ScheduleStatus status) {
+      ScheduledTask builder = task.get().newBuilder();
+      if (builder.getStatus() != status) {
+        // Only add a task event if this is a state change.
+        builder.addToTaskEvents(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
+      }
+      builder.setStatus(status);
+
+      task = Optional.of(IScheduledTask.build(builder));
+      return updater.evaluate(task);
     }
 
-    events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
+    void evaluateCurrentState(Result expectedResult) {
+      assertEquals(expectedResult, updater.evaluate(task));
+    }
 
-    return IScheduledTask.build(
-        new ScheduledTask()
-            .setStatus(status)
-            .setTaskEvents(ImmutableList.copyOf(events))
-            .setAssignedTask(
-                new AssignedTask()
-                    .setTask(config.newBuilder())));
-  }
+    void evaluate(
+        Result expectedResult,
+        ScheduleStatus status,
+        ScheduleStatus... statuses) {
 
-  private void evaluate(ITaskConfig config, ScheduleStatus status, ScheduleStatus... statuses) {
-    IScheduledTask task = makeTask(config, status);
+      assertEquals(expectedResult, changeStatusAndEvaluate(status));
+      for (ScheduleStatus s : statuses) {
+        assertEquals(expectedResult, changeStatusAndEvaluate(s));
+      }
+    }
 
-    updater.evaluate(Optional.of(task));
-    for (ScheduleStatus s : statuses) {
-      ScheduledTask builder = task.newBuilder();
-      builder.setStatus(s);
-      builder.addToTaskEvents(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(s));
-      task = IScheduledTask.build(builder);
-      updater.evaluate(Optional.of(task));
+    void advanceTime(Amount<Long, Time> time) {
+      clock.advance(time);
     }
   }
 
   @Test
   public void testSuccessfulUpdate() {
-    controller.killTask();
-    controller.addReplacement();
-    controller.reevaluteAfterRunningLimit();
-    expectLastCall().times(4);
-    controller.updateCompleted(SUCCESS);
-
-    control.replay();
-
-    newUpdater(NEW, 1);
-    evaluate(OLD, RUNNING, KILLING, FINISHED);
-    evaluate(NEW, PENDING, ASSIGNED, STARTING);
-    IScheduledTask task = makeTask(NEW, RUNNING);
-    updater.evaluate(Optional.of(task));
-    clock.advance(MIN_RUNNING_TIME);
-    updater.evaluate(Optional.of(task));
+    TestFixture f = new TestFixture(NEW, 1);
+    f.setActualState(OLD);
+    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+    f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+    f.advanceTime(MIN_RUNNING_TIME);
+    f.evaluateCurrentState(SUCCEEDED);
   }
 
   @Test
   public void testUpdateRetryOnTaskExit() {
-    controller.killTask();
-    controller.addReplacement();
-    controller.reevaluteAfterRunningLimit();
-    expectLastCall().times(8);
-    controller.updateCompleted(SUCCESS);
-
-    control.replay();
-
-    newUpdater(NEW, 1);
-    evaluate(OLD, RUNNING, KILLING, FINISHED);
-    evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
-    evaluate(NEW, PENDING, ASSIGNED, STARTING);
-    IScheduledTask task = makeTask(NEW, RUNNING);
-    updater.evaluate(Optional.of(task));
-    clock.advance(MIN_RUNNING_TIME);
-    updater.evaluate(Optional.of(task));
+    TestFixture f = new TestFixture(NEW, 1);
+    f.setActualState(OLD);
+    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+    f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, FAILED);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, RUNNING);
+    f.advanceTime(MIN_RUNNING_TIME);
+    f.evaluateCurrentState(SUCCEEDED);
   }
 
   @Test
   public void testUpdateRetryFailure() {
-    controller.killTask();
-    controller.addReplacement();
-    controller.reevaluteAfterRunningLimit();
-    expectLastCall().times(4);
-    controller.updateCompleted(Result.FAILED);
-
-    control.replay();
-
-    newUpdater(NEW, 0);
-    evaluate(OLD, RUNNING, KILLING, FINISHED);
-    evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+    TestFixture f = new TestFixture(NEW, 0);
+    f.setActualState(OLD);
+    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+    f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+    f.evaluate(Result.FAILED, FAILED);
   }
 
   @Test
   public void testNoopUpdate() {
-    controller.updateCompleted(SUCCESS);
-
-    control.replay();
-
-    newUpdater(NEW, 1);
-    IScheduledTask task = makeTask(NEW, RUNNING);
-    clock.advance(MIN_RUNNING_TIME);
-    updater.evaluate(Optional.of(task));
+    TestFixture f = new TestFixture(NEW, 1);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, RUNNING);
+    f.advanceTime(MIN_RUNNING_TIME);
+    f.evaluate(SUCCEEDED, RUNNING);
   }
 
   @Test
   public void testPointlessUpdate() {
-    controller.updateCompleted(SUCCESS);
-
-    control.replay();
-
-    newUpdater(NO_CONFIG, 1);
-    updater.evaluate(NO_TASK);
+    TestFixture f = new TestFixture(NO_CONFIG, 1);
+    f.setActualStateAbsent();
+    f.evaluateCurrentState(SUCCEEDED);
   }
 
   @Test
   public void testNoOldConfig() {
-    controller.addReplacement();
-    controller.reevaluteAfterRunningLimit();
-    expectLastCall().times(4);
-    controller.updateCompleted(SUCCESS);
-
-    control.replay();
-
-    newUpdater(NEW, 1);
-    updater.evaluate(NO_TASK);
-    evaluate(NEW, PENDING, ASSIGNED, STARTING);
-    IScheduledTask task = makeTask(NEW, RUNNING);
-    updater.evaluate(Optional.of(task));
-    clock.advance(MIN_RUNNING_TIME);
-    updater.evaluate(Optional.of(task));
+    TestFixture f = new TestFixture(NEW, 1);
+    f.setActualStateAbsent();
+    f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+    f.advanceTime(MIN_RUNNING_TIME);
+    f.evaluateCurrentState(SUCCEEDED);
   }
 
   @Test
   public void testNoNewConfig() {
-    controller.killTask();
-    controller.updateCompleted(SUCCESS);
-
-    control.replay();
-
-    newUpdater(NO_CONFIG, 1);
-    evaluate(OLD, RUNNING, KILLING, FINISHED);
-    updater.evaluate(NO_TASK);
-  }
-
-  @Test
-  public void testAvoidsMultipleResults() {
-    controller.killTask();
-    controller.addReplacement();
-    controller.reevaluteAfterRunningLimit();
-    expectLastCall().times(4);
-    controller.updateCompleted(SUCCESS);
-
-    control.replay();
-
-    newUpdater(NEW, 1);
-    evaluate(OLD, RUNNING, KILLING, FINISHED);
-    evaluate(NEW, PENDING, ASSIGNED, STARTING);
-    IScheduledTask task = makeTask(NEW, RUNNING);
-    updater.evaluate(Optional.of(task));
-    clock.advance(MIN_RUNNING_TIME);
-    updater.evaluate(Optional.of(task));
-    // The extra evaluation should not result in another updateCompleted().
-    updater.evaluate(Optional.of(task));
+    TestFixture f = new TestFixture(NO_CONFIG, 1);
+    f.setActualState(OLD);
+    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING, FINISHED);
+    f.setActualStateAbsent();
+    f.evaluateCurrentState(SUCCEEDED);
   }
 
   @Test
   public void testStuckInPending() {
-    controller.killTask();
-    controller.addReplacement();
-    controller.reevaluteAfterRunningLimit();
-    expectLastCall().times(2);
-    controller.killTask();
-    controller.addReplacement();
-    controller.updateCompleted(Result.FAILED);
-
-    control.replay();
-
-    newUpdater(NEW, 1);
-    evaluate(OLD, RUNNING, KILLING, FINISHED);
-    evaluate(NEW, PENDING);
-    IScheduledTask task1 = makeTask(NEW, PENDING);
-    clock.advance(MAX_NON_RUNNING_TIME);
-    updater.evaluate(Optional.of(task1));
-    updater.evaluate(NO_TASK);
-    evaluate(NEW, PENDING);
-    IScheduledTask task2 = makeTask(NEW, PENDING);
-    clock.advance(MAX_NON_RUNNING_TIME);
-    updater.evaluate(Optional.of(task2));
+    TestFixture f = new TestFixture(NEW, 1);
+    f.setActualState(OLD);
+    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+    f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING);
+    f.advanceTime(MAX_NON_RUNNING_TIME);
+    f.evaluateCurrentState(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+    f.setActualStateAbsent();
+    f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING);
+    f.advanceTime(MAX_NON_RUNNING_TIME);
+    f.evaluateCurrentState(Result.FAILED);
   }
 
   @Test
   public void testStuckInKilling() {
-    controller.killTask();
-    controller.addReplacement();
-    controller.reevaluteAfterRunningLimit();
-    expectLastCall().times(6);
-    controller.killTask();
-    controller.addReplacement();
-    controller.updateCompleted(Result.FAILED);
-
-    control.replay();
-
-    newUpdater(NEW, 1);
-    evaluate(OLD, RUNNING, KILLING, FINISHED);
-    evaluate(NEW, PENDING);
-    IScheduledTask task1 = makeTask(NEW, PENDING);
-    clock.advance(MAX_NON_RUNNING_TIME);
-    updater.evaluate(Optional.of(task1));
-    updater.evaluate(NO_TASK);
-    evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING);
-    IScheduledTask task2 = makeTask(NEW, KILLING);
-    updater.evaluate(Optional.of(task2));
-    clock.advance(MAX_NON_RUNNING_TIME);
-    updater.evaluate(Optional.of(task2));
+    TestFixture f = new TestFixture(NEW, 1);
+    f.setActualState(OLD);
+    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+    f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING);
+    f.advanceTime(MAX_NON_RUNNING_TIME);
+    f.evaluateCurrentState(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+    f.setActualStateAbsent();
+    f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, ASSIGNED, STARTING, RUNNING);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, KILLING);
+    f.advanceTime(MAX_NON_RUNNING_TIME);
+    f.evaluateCurrentState(Result.FAILED);
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testInvalidInput() {
-    control.replay();
-
-    newUpdater(NEW, 1);
-    IScheduledTask noEvents = IScheduledTask.build(
-        makeTask(OLD, RUNNING).newBuilder().setTaskEvents(ImmutableList.<TaskEvent>of()));
-    updater.evaluate(Optional.of(noEvents));
+    TestFixture f = new TestFixture(NEW, 1);
+    ScheduledTask noEvents = new TaskUtil(new FakeClock())
+        .makeTask(OLD, RUNNING).newBuilder().setTaskEvents(ImmutableList.<TaskEvent>of());
+    f.updater.evaluate(Optional.of(IScheduledTask.build(noEvents)));
   }
 
   @Test
@@ -306,12 +243,10 @@ public class InstanceUpdaterTest extends EasyMockTest {
     // If the original task dies, the updater should not add a replacement if the task will be
     // resuscitated.  Only a task that has passed through KILLING will not be resuscitated.
 
-    control.replay();
-
-    newUpdater(NEW, 1);
-
-    // Task did not pass through KILLING, therefore will be rescheudled.
-    evaluate(OLD, FINISHED);
+    TestFixture f = new TestFixture(NEW, 1);
+    f.setActualState(OLD);
+    // Task did not pass through KILLING, therefore will be rescheduled.
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED);
   }
 
   @Test
@@ -320,24 +255,17 @@ public class InstanceUpdaterTest extends EasyMockTest {
     // If the original task dies, the updater should not add a replacement if the task will be
     // resuscitated.  Only a task that has passed through KILLING will not be resuscitated.
 
-    controller.killTask();
-    controller.addReplacement();
-    controller.reevaluteAfterRunningLimit();
-    expectLastCall().times(4);
-    controller.updateCompleted(SUCCESS);
-
-    control.replay();
-
-    newUpdater(NEW, 1);
-
-    // Task did not pass through KILLING, therefore will be rescheudled.
-    evaluate(OLD, FINISHED);
-    evaluate(OLD, PENDING);
-    updater.evaluate(NO_TASK);
-    evaluate(NEW, PENDING, ASSIGNED, STARTING);
-    IScheduledTask task = makeTask(NEW, RUNNING);
-    updater.evaluate(Optional.of(task));
-    clock.advance(MIN_RUNNING_TIME);
-    updater.evaluate(Optional.of(task));
+    TestFixture f = new TestFixture(NEW, 1);
+    f.setActualState(OLD);
+
+    // Task did not pass through KILLING, therefore will be rescheduled.
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED);
+    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, PENDING);
+    f.setActualStateAbsent();
+    f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+    f.setActualState(NEW);
+    f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+    f.advanceTime(MIN_RUNNING_TIME);
+    f.evaluateCurrentState(SUCCEEDED);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/dd91370f/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java b/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java
new file mode 100644
index 0000000..0e67f91
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+
+final class TaskUtil {
+
+  private final FakeClock clock;
+
+  TaskUtil(FakeClock clock) {
+    this.clock = Objects.requireNonNull(clock);
+  }
+
+  IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) {
+    List<TaskEvent> events = Lists.newArrayList();
+    if (status != PENDING) {
+      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING));
+    }
+    if (Tasks.isTerminated(status) || status == KILLING) {
+      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED));
+      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING));
+    }
+
+    events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
+
+    return IScheduledTask.build(
+        new ScheduledTask()
+            .setStatus(status)
+            .setTaskEvents(ImmutableList.copyOf(events))
+            .setAssignedTask(
+                new AssignedTask()
+                    .setTask(config.newBuilder())));
+  }
+}