You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/08/02 02:11:27 UTC

git commit: Add an InstanceUpdater to fit into rolling update coordination in the scheduler.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master b0914157c -> 54a096279


Add an InstanceUpdater to fit into rolling update coordination in the scheduler.

Bugs closed: AURORA-621

Reviewed at https://reviews.apache.org/r/24078/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/54a09627
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/54a09627
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/54a09627

Branch: refs/heads/master
Commit: 54a096279a836d1e8098207e1ad943c77fb4a7f2
Parents: b091415
Author: Bill Farner <wf...@apache.org>
Authored: Fri Aug 1 17:08:29 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Aug 1 17:08:29 2014 -0700

----------------------------------------------------------------------
 .../scheduler/updater/InstanceUpdater.java      | 220 ++++++++++++
 .../scheduler/updater/TaskController.java       |  54 +++
 .../scheduler/updater/InstanceUpdaterTest.java  | 343 +++++++++++++++++++
 3 files changed, 617 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/54a09627/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
new file mode 100644
index 0000000..2863517
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.logging.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.FAILED;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCESS;
+
+/**
+ * In part of a job update, this manages the update of an individual instance. This includes
+ * deciding how to effect an update from a possibly-absent old configuration to a possibly-absent
+ * new configuration, and detecting whether a replaced instance becomes unstable.
+ *
+ * TODO(wfarner): This probably needs to be parameterized so that it may be reused for rollbacks.
+ */
+class InstanceUpdater {
+  private static final Logger LOG = Logger.getLogger(InstanceUpdater.class.getName());
+
+  private final Optional<ITaskConfig> desiredState;
+  private final int toleratedFailures;
+  private final Amount<Long, Time> minRunningTime;
+  private final Amount<Long, Time> maxNonRunningTime;
+  private final Clock clock;
+
+  /**
+   * Keep an optional controller reference so that we may discard the reference after we've
+   * advertised completion through {@link TaskController#updateCompleted(Result) updateCompleted}.
+   * This gives us a signal to no-op (when this is reset to absent), and ensures we can't send any
+   * control signals from that point on.
+   */
+  private Optional<TaskController> controllerRef = Optional.absent();
+
+  private int observedFailures = 0;
+
+  InstanceUpdater(
+      Optional<ITaskConfig> desiredState,
+      int toleratedFailures,
+      Amount<Long, Time> minRunningTime,
+      Amount<Long, Time> maxNonRunningTime,
+      Clock clock,
+      final TaskController controller) {
+
+    this.desiredState = requireNonNull(desiredState);
+    this.toleratedFailures = toleratedFailures;
+    this.minRunningTime = requireNonNull(minRunningTime);
+    this.maxNonRunningTime = requireNonNull(maxNonRunningTime);
+    this.clock = requireNonNull(clock);
+    this.controllerRef = Optional.of(controller);
+  }
+
+  private long millisSince(ITaskEvent event) {
+    return clock.nowMillis() - event.getTimestamp();
+  }
+
+  private boolean appearsStable(IScheduledTask task) {
+    return millisSince(Tasks.getLatestEvent(task)) >= minRunningTime.as(Time.MILLISECONDS);
+  }
+
+  private boolean appearsStuck(IScheduledTask task) {
+    // Walk task events backwards to find the first event, or first non-running event.
+    ITaskEvent earliestNonRunningEvent = task.getTaskEvents().get(0);
+    for (ITaskEvent event : Lists.reverse(task.getTaskEvents())) {
+      if (event.getStatus() == RUNNING) {
+        break;
+      } else {
+        earliestNonRunningEvent = event;
+      }
+    }
+
+    return millisSince(earliestNonRunningEvent) >= maxNonRunningTime.as(Time.MILLISECONDS);
+  }
+
+  private boolean permanentlyKilled(IScheduledTask task) {
+    return Iterables.any(
+        task.getTaskEvents(),
+        Predicates.compose(Predicates.equalTo(ScheduleStatus.KILLING), Tasks.TASK_EVENT_TO_STATUS));
+  }
+
+  private static boolean isKillable(ScheduleStatus status) {
+    return Tasks.isActive(status) && status != ScheduleStatus.KILLING;
+  }
+
+  private void completed(Result status) {
+    controllerRef.get().updateCompleted(status);
+    controllerRef = Optional.absent();
+  }
+
+  /**
+   * Evaluates the state differences between the originally-provided {@code desiredState} and the
+   * provided {@code actualState}, and invokes any necessary actions on the provided
+   * {@link TaskController}.
+   * <p>
+   * This function should be idempotent, with the exception of an internal failure counter that
+   * increments when an updating task exits, or an active but not
+   * {@link ScheduleStatus#RUNNING RUNNING} task takes too long to start.
+   *
+   * <p>
+   * It is the reponsibility of the caller to ensure that the {@code actualState} is the latest
+   * value.  Note: the caller should avoid calling this when a terminal task is moving to another
+   * terminal state.  It should also suppress deletion events for tasks that have been replaced by
+   * an active task.
+   *
+   * @param actualState The actual observed state of the task.
+   */
+  synchronized void evaluate(Optional<IScheduledTask> actualState) {
+    if (!controllerRef.isPresent()) {
+      // Avoid any further action if a result was already given.
+      return;
+    }
+
+    TaskController controller = controllerRef.get();
+
+    boolean desiredPresent = desiredState.isPresent();
+    boolean actualPresent = actualState.isPresent();
+
+    if (desiredPresent && actualPresent) {
+      // The update is changing the task configuration.
+      handleActualAndDesiredPresent(actualState.get());
+    } else if (desiredPresent) {
+      // The update is introducing a new instance.
+      controller.addReplacement();
+    } else if (actualPresent) {
+      // The update is removing an instance.
+      if (isKillable(actualState.get().getStatus())) {
+        controller.killTask();
+      }
+    } else {
+      // No-op update.
+      completed(SUCCESS);
+    }
+  }
+
+  private boolean addFailureAndCheckIfFailed() {
+    LOG.info("Observed updated task failure.");
+    observedFailures++;
+    if (observedFailures > toleratedFailures) {
+      completed(FAILED);
+      return true;
+    }
+    return false;
+  }
+
+  private void handleActualAndDesiredPresent(IScheduledTask actualState) {
+    Preconditions.checkState(desiredState.isPresent());
+    Preconditions.checkArgument(!actualState.getTaskEvents().isEmpty());
+
+    TaskController controller = controllerRef.get();
+
+    ScheduleStatus status = actualState.getStatus();
+    if (desiredState.get().equals(actualState.getAssignedTask().getTask())) {
+      // The desired task is in the system.
+      if (status == RUNNING) {
+        // The desired task is running.
+        if (appearsStable(actualState)) {
+          // Stably running, our work here is done.
+          completed(SUCCESS);
+        } else {
+          // Not running long enough to consider stable, check again later.
+          controller.reevaluteAfterRunningLimit();
+        }
+      } else if (Tasks.isTerminated(status)) {
+        // The desired task has terminated, this is a failure.
+        addFailureAndCheckIfFailed();
+      } else if (appearsStuck(actualState)) {
+        // The task is not running, but not terminated, and appears to have been in this state
+        // long enough that we should intervene.
+        if (!addFailureAndCheckIfFailed()) {
+          controller.killTask();
+        }
+      } else {
+        // The task is in a transient state on the way into or out of running, check back later.
+        controller.reevaluteAfterRunningLimit();
+      }
+    } else {
+      // This is not the configuration that we would like to run.
+      if (isKillable(status)) {
+        // Task is active, kill it.
+        controller.killTask();
+      } else if (Tasks.isTerminated(status) && permanentlyKilled(actualState)) {
+        // The old task has exited, it is now safe to add the new one.
+        controller.addReplacement();
+      }
+    }
+  }
+
+  enum Result {
+    SUCCESS,
+    FAILED
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/54a09627/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java b/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
new file mode 100644
index 0000000..b725a31
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+/**
+ * Controls that an instance updater uses to modify the job instance it is updating, or advertise
+ * that it has reached a result.
+ */
+interface TaskController {
+  /**
+   * Terminates the instance, ensuring that it is not automatically restarted.
+   * <p>
+   * If the implementation wishes to time out on the kill attempt, it should schedule an action
+   * to invoke {@link InstanceUpdater#evaluate(com.google.common.base.Optional)} after a delay.
+   */
+  void killTask();
+
+  /**
+   * Adds a task with the new configuration to replace the old task.
+   * <p>
+   * If the implementation wishes to time out on the task replacement (such as to deal with a
+   * task stuck in {@link org.apache.aurora.gen.ScheduleStatus#PENDING}, it should schedule an
+   * action to invoke {@link InstanceUpdater#evaluate(com.google.common.base.Optional)} after a
+   * delay.
+   */
+  void addReplacement();
+
+  /**
+   * Requests that the updater be re-evaluated after the amount of time a task must remain in
+   * {@link org.apache.aurora.gen.ScheduleStatus#RUNNING} to be considered stable. The re-evaluation
+   * should be after the same amount of time as {@code minRunningTime}.
+   */
+  void reevaluteAfterRunningLimit();
+
+  /**
+   * Announces a result of the attempt to update the instance.
+   * <p>
+   * Once this callback has been made, the updater will no-op on any subsequent evaluations.
+   *
+   * @param status Update result.
+   */
+  void updateCompleted(InstanceUpdater.Result status);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/54a09627/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
new file mode 100644
index 0000000..a845363
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCESS;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class InstanceUpdaterTest extends EasyMockTest {
+  private static final Optional<ITaskConfig> NO_CONFIG = Optional.absent();
+  private static final Optional<IScheduledTask> NO_TASK = Optional.absent();
+
+  private static final ITaskConfig OLD = ITaskConfig.build(new TaskConfig().setNumCpus(1.0));
+  private static final ITaskConfig NEW = ITaskConfig.build(new TaskConfig().setNumCpus(2.0));
+
+  private static final Amount<Long, Time> MIN_RUNNING_TIME = Amount.of(1L, Time.MINUTES);
+  private static final Amount<Long, Time> MAX_NON_RUNNING_TIME = Amount.of(5L, Time.MINUTES);
+
+  private FakeClock clock;
+  private TaskController controller;
+  private InstanceUpdater updater;
+
+  @Before
+  public void setUp() {
+    clock = new FakeClock();
+    controller = createMock(TaskController.class);
+  }
+
+  private void newUpdater(Optional<ITaskConfig> newConfig, int maxToleratedFailures) {
+    updater = new InstanceUpdater(
+        newConfig,
+        maxToleratedFailures,
+        MIN_RUNNING_TIME,
+        MAX_NON_RUNNING_TIME,
+        clock,
+        controller);
+  }
+
+  private void newUpdater(ITaskConfig newConfig, int maxToleratedFailures) {
+    newUpdater(Optional.of(newConfig), maxToleratedFailures);
+  }
+
+  private IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) {
+    List<TaskEvent> events = Lists.newArrayList();
+    if (status != PENDING) {
+      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING));
+    }
+    if (Tasks.isTerminated(status) || status == KILLING) {
+      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED));
+      events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING));
+    }
+
+    events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
+
+    return IScheduledTask.build(
+        new ScheduledTask()
+            .setStatus(status)
+            .setTaskEvents(ImmutableList.copyOf(events))
+            .setAssignedTask(
+                new AssignedTask()
+                    .setTask(config.newBuilder())));
+  }
+
+  private void evaluate(ITaskConfig config, ScheduleStatus status, ScheduleStatus... statuses) {
+    IScheduledTask task = makeTask(config, status);
+
+    updater.evaluate(Optional.of(task));
+    for (ScheduleStatus s : statuses) {
+      ScheduledTask builder = task.newBuilder();
+      builder.setStatus(s);
+      builder.addToTaskEvents(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(s));
+      task = IScheduledTask.build(builder);
+      updater.evaluate(Optional.of(task));
+    }
+  }
+
+  @Test
+  public void testSuccessfulUpdate() {
+    controller.killTask();
+    controller.addReplacement();
+    controller.reevaluteAfterRunningLimit();
+    expectLastCall().times(4);
+    controller.updateCompleted(SUCCESS);
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+    evaluate(OLD, RUNNING, KILLING, FINISHED);
+    evaluate(NEW, PENDING, ASSIGNED, STARTING);
+    IScheduledTask task = makeTask(NEW, RUNNING);
+    updater.evaluate(Optional.of(task));
+    clock.advance(MIN_RUNNING_TIME);
+    updater.evaluate(Optional.of(task));
+  }
+
+  @Test
+  public void testUpdateRetryOnTaskExit() {
+    controller.killTask();
+    controller.addReplacement();
+    controller.reevaluteAfterRunningLimit();
+    expectLastCall().times(8);
+    controller.updateCompleted(SUCCESS);
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+    evaluate(OLD, RUNNING, KILLING, FINISHED);
+    evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+    evaluate(NEW, PENDING, ASSIGNED, STARTING);
+    IScheduledTask task = makeTask(NEW, RUNNING);
+    updater.evaluate(Optional.of(task));
+    clock.advance(MIN_RUNNING_TIME);
+    updater.evaluate(Optional.of(task));
+  }
+
+  @Test
+  public void testUpdateRetryFailure() {
+    controller.killTask();
+    controller.addReplacement();
+    controller.reevaluteAfterRunningLimit();
+    expectLastCall().times(4);
+    controller.updateCompleted(Result.FAILED);
+
+    control.replay();
+
+    newUpdater(NEW, 0);
+    evaluate(OLD, RUNNING, KILLING, FINISHED);
+    evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+  }
+
+  @Test
+  public void testNoopUpdate() {
+    controller.updateCompleted(SUCCESS);
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+    IScheduledTask task = makeTask(NEW, RUNNING);
+    clock.advance(MIN_RUNNING_TIME);
+    updater.evaluate(Optional.of(task));
+  }
+
+  @Test
+  public void testPointlessUpdate() {
+    controller.updateCompleted(SUCCESS);
+
+    control.replay();
+
+    newUpdater(NO_CONFIG, 1);
+    updater.evaluate(NO_TASK);
+  }
+
+  @Test
+  public void testNoOldConfig() {
+    controller.addReplacement();
+    controller.reevaluteAfterRunningLimit();
+    expectLastCall().times(4);
+    controller.updateCompleted(SUCCESS);
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+    updater.evaluate(NO_TASK);
+    evaluate(NEW, PENDING, ASSIGNED, STARTING);
+    IScheduledTask task = makeTask(NEW, RUNNING);
+    updater.evaluate(Optional.of(task));
+    clock.advance(MIN_RUNNING_TIME);
+    updater.evaluate(Optional.of(task));
+  }
+
+  @Test
+  public void testNoNewConfig() {
+    controller.killTask();
+    controller.updateCompleted(SUCCESS);
+
+    control.replay();
+
+    newUpdater(NO_CONFIG, 1);
+    evaluate(OLD, RUNNING, KILLING, FINISHED);
+    updater.evaluate(NO_TASK);
+  }
+
+  @Test
+  public void testAvoidsMultipleResults() {
+    controller.killTask();
+    controller.addReplacement();
+    controller.reevaluteAfterRunningLimit();
+    expectLastCall().times(4);
+    controller.updateCompleted(SUCCESS);
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+    evaluate(OLD, RUNNING, KILLING, FINISHED);
+    evaluate(NEW, PENDING, ASSIGNED, STARTING);
+    IScheduledTask task = makeTask(NEW, RUNNING);
+    updater.evaluate(Optional.of(task));
+    clock.advance(MIN_RUNNING_TIME);
+    updater.evaluate(Optional.of(task));
+    // The extra evaluation should not result in another updateCompleted().
+    updater.evaluate(Optional.of(task));
+  }
+
+  @Test
+  public void testStuckInPending() {
+    controller.killTask();
+    controller.addReplacement();
+    controller.reevaluteAfterRunningLimit();
+    expectLastCall().times(2);
+    controller.killTask();
+    controller.addReplacement();
+    controller.updateCompleted(Result.FAILED);
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+    evaluate(OLD, RUNNING, KILLING, FINISHED);
+    evaluate(NEW, PENDING);
+    IScheduledTask task1 = makeTask(NEW, PENDING);
+    clock.advance(MAX_NON_RUNNING_TIME);
+    updater.evaluate(Optional.of(task1));
+    updater.evaluate(NO_TASK);
+    evaluate(NEW, PENDING);
+    IScheduledTask task2 = makeTask(NEW, PENDING);
+    clock.advance(MAX_NON_RUNNING_TIME);
+    updater.evaluate(Optional.of(task2));
+  }
+
+  @Test
+  public void testStuckInKilling() {
+    controller.killTask();
+    controller.addReplacement();
+    controller.reevaluteAfterRunningLimit();
+    expectLastCall().times(6);
+    controller.killTask();
+    controller.addReplacement();
+    controller.updateCompleted(Result.FAILED);
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+    evaluate(OLD, RUNNING, KILLING, FINISHED);
+    evaluate(NEW, PENDING);
+    IScheduledTask task1 = makeTask(NEW, PENDING);
+    clock.advance(MAX_NON_RUNNING_TIME);
+    updater.evaluate(Optional.of(task1));
+    updater.evaluate(NO_TASK);
+    evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING);
+    IScheduledTask task2 = makeTask(NEW, KILLING);
+    updater.evaluate(Optional.of(task2));
+    clock.advance(MAX_NON_RUNNING_TIME);
+    updater.evaluate(Optional.of(task2));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidInput() {
+    control.replay();
+
+    newUpdater(NEW, 1);
+    IScheduledTask noEvents = IScheduledTask.build(
+        makeTask(OLD, RUNNING).newBuilder().setTaskEvents(ImmutableList.<TaskEvent>of()));
+    updater.evaluate(Optional.of(noEvents));
+  }
+
+  @Test
+  public void testOldTaskDies() {
+    // If the original task dies, the updater should not add a replacement if the task will be
+    // resuscitated.  Only a task that has passed through KILLING will not be resuscitated.
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+
+    // Task did not pass through KILLING, therefore will be rescheudled.
+    evaluate(OLD, FINISHED);
+  }
+
+  @Test
+  public void testOldTaskDiesAndRescheduled() {
+    // Identical to testOldTaskDies, with the follow-through of rescheduling and updating.
+    // If the original task dies, the updater should not add a replacement if the task will be
+    // resuscitated.  Only a task that has passed through KILLING will not be resuscitated.
+
+    controller.killTask();
+    controller.addReplacement();
+    controller.reevaluteAfterRunningLimit();
+    expectLastCall().times(4);
+    controller.updateCompleted(SUCCESS);
+
+    control.replay();
+
+    newUpdater(NEW, 1);
+
+    // Task did not pass through KILLING, therefore will be rescheudled.
+    evaluate(OLD, FINISHED);
+    evaluate(OLD, PENDING);
+    updater.evaluate(NO_TASK);
+    evaluate(NEW, PENDING, ASSIGNED, STARTING);
+    IScheduledTask task = makeTask(NEW, RUNNING);
+    updater.evaluate(Optional.of(task));
+    clock.advance(MIN_RUNNING_TIME);
+    updater.evaluate(Optional.of(task));
+  }
+}