You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/08/02 02:11:27 UTC
git commit: Add an InstanceUpdater to fit into rolling update
coordination in the scheduler.
Repository: incubator-aurora
Updated Branches:
refs/heads/master b0914157c -> 54a096279
Add an InstanceUpdater to fit into rolling update coordination in the scheduler.
Bugs closed: AURORA-621
Reviewed at https://reviews.apache.org/r/24078/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/54a09627
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/54a09627
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/54a09627
Branch: refs/heads/master
Commit: 54a096279a836d1e8098207e1ad943c77fb4a7f2
Parents: b091415
Author: Bill Farner <wf...@apache.org>
Authored: Fri Aug 1 17:08:29 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Aug 1 17:08:29 2014 -0700
----------------------------------------------------------------------
.../scheduler/updater/InstanceUpdater.java | 220 ++++++++++++
.../scheduler/updater/TaskController.java | 54 +++
.../scheduler/updater/InstanceUpdaterTest.java | 343 +++++++++++++++++++
3 files changed, 617 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/54a09627/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
new file mode 100644
index 0000000..2863517
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.logging.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.FAILED;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCESS;
+
+/**
+ * In part of a job update, this manages the update of an individual instance. This includes
+ * deciding how to effect an update from a possibly-absent old configuration to a possibly-absent
+ * new configuration, and detecting whether a replaced instance becomes unstable.
+ *
+ * TODO(wfarner): This probably needs to be parameterized so that it may be reused for rollbacks.
+ */
+class InstanceUpdater {
+ private static final Logger LOG = Logger.getLogger(InstanceUpdater.class.getName());
+
+ private final Optional<ITaskConfig> desiredState;
+ private final int toleratedFailures;
+ private final Amount<Long, Time> minRunningTime;
+ private final Amount<Long, Time> maxNonRunningTime;
+ private final Clock clock;
+
+ /**
+ * Keep an optional controller reference so that we may discard the reference after we've
+ * advertised completion through {@link TaskController#updateCompleted(Result) updateCompleted}.
+ * This gives us a signal to no-op (when this is reset to absent), and ensures we can't send any
+ * control signals from that point on.
+ */
+ private Optional<TaskController> controllerRef = Optional.absent();
+
+ private int observedFailures = 0;
+
+ InstanceUpdater(
+ Optional<ITaskConfig> desiredState,
+ int toleratedFailures,
+ Amount<Long, Time> minRunningTime,
+ Amount<Long, Time> maxNonRunningTime,
+ Clock clock,
+ final TaskController controller) {
+
+ this.desiredState = requireNonNull(desiredState);
+ this.toleratedFailures = toleratedFailures;
+ this.minRunningTime = requireNonNull(minRunningTime);
+ this.maxNonRunningTime = requireNonNull(maxNonRunningTime);
+ this.clock = requireNonNull(clock);
+ this.controllerRef = Optional.of(controller);
+ }
+
+ private long millisSince(ITaskEvent event) {
+ return clock.nowMillis() - event.getTimestamp();
+ }
+
+ private boolean appearsStable(IScheduledTask task) {
+ return millisSince(Tasks.getLatestEvent(task)) >= minRunningTime.as(Time.MILLISECONDS);
+ }
+
+ private boolean appearsStuck(IScheduledTask task) {
+ // Walk task events backwards to find the first event, or first non-running event.
+ ITaskEvent earliestNonRunningEvent = task.getTaskEvents().get(0);
+ for (ITaskEvent event : Lists.reverse(task.getTaskEvents())) {
+ if (event.getStatus() == RUNNING) {
+ break;
+ } else {
+ earliestNonRunningEvent = event;
+ }
+ }
+
+ return millisSince(earliestNonRunningEvent) >= maxNonRunningTime.as(Time.MILLISECONDS);
+ }
+
+ private boolean permanentlyKilled(IScheduledTask task) {
+ return Iterables.any(
+ task.getTaskEvents(),
+ Predicates.compose(Predicates.equalTo(ScheduleStatus.KILLING), Tasks.TASK_EVENT_TO_STATUS));
+ }
+
+ private static boolean isKillable(ScheduleStatus status) {
+ return Tasks.isActive(status) && status != ScheduleStatus.KILLING;
+ }
+
+ private void completed(Result status) {
+ controllerRef.get().updateCompleted(status);
+ controllerRef = Optional.absent();
+ }
+
+ /**
+ * Evaluates the state differences between the originally-provided {@code desiredState} and the
+ * provided {@code actualState}, and invokes any necessary actions on the provided
+ * {@link TaskController}.
+ * <p>
+ * This function should be idempotent, with the exception of an internal failure counter that
+ * increments when an updating task exits, or an active but not
+ * {@link ScheduleStatus#RUNNING RUNNING} task takes too long to start.
+ *
+ * <p>
+ * It is the reponsibility of the caller to ensure that the {@code actualState} is the latest
+ * value. Note: the caller should avoid calling this when a terminal task is moving to another
+ * terminal state. It should also suppress deletion events for tasks that have been replaced by
+ * an active task.
+ *
+ * @param actualState The actual observed state of the task.
+ */
+ synchronized void evaluate(Optional<IScheduledTask> actualState) {
+ if (!controllerRef.isPresent()) {
+ // Avoid any further action if a result was already given.
+ return;
+ }
+
+ TaskController controller = controllerRef.get();
+
+ boolean desiredPresent = desiredState.isPresent();
+ boolean actualPresent = actualState.isPresent();
+
+ if (desiredPresent && actualPresent) {
+ // The update is changing the task configuration.
+ handleActualAndDesiredPresent(actualState.get());
+ } else if (desiredPresent) {
+ // The update is introducing a new instance.
+ controller.addReplacement();
+ } else if (actualPresent) {
+ // The update is removing an instance.
+ if (isKillable(actualState.get().getStatus())) {
+ controller.killTask();
+ }
+ } else {
+ // No-op update.
+ completed(SUCCESS);
+ }
+ }
+
+ private boolean addFailureAndCheckIfFailed() {
+ LOG.info("Observed updated task failure.");
+ observedFailures++;
+ if (observedFailures > toleratedFailures) {
+ completed(FAILED);
+ return true;
+ }
+ return false;
+ }
+
+ private void handleActualAndDesiredPresent(IScheduledTask actualState) {
+ Preconditions.checkState(desiredState.isPresent());
+ Preconditions.checkArgument(!actualState.getTaskEvents().isEmpty());
+
+ TaskController controller = controllerRef.get();
+
+ ScheduleStatus status = actualState.getStatus();
+ if (desiredState.get().equals(actualState.getAssignedTask().getTask())) {
+ // The desired task is in the system.
+ if (status == RUNNING) {
+ // The desired task is running.
+ if (appearsStable(actualState)) {
+ // Stably running, our work here is done.
+ completed(SUCCESS);
+ } else {
+ // Not running long enough to consider stable, check again later.
+ controller.reevaluteAfterRunningLimit();
+ }
+ } else if (Tasks.isTerminated(status)) {
+ // The desired task has terminated, this is a failure.
+ addFailureAndCheckIfFailed();
+ } else if (appearsStuck(actualState)) {
+ // The task is not running, but not terminated, and appears to have been in this state
+ // long enough that we should intervene.
+ if (!addFailureAndCheckIfFailed()) {
+ controller.killTask();
+ }
+ } else {
+ // The task is in a transient state on the way into or out of running, check back later.
+ controller.reevaluteAfterRunningLimit();
+ }
+ } else {
+ // This is not the configuration that we would like to run.
+ if (isKillable(status)) {
+ // Task is active, kill it.
+ controller.killTask();
+ } else if (Tasks.isTerminated(status) && permanentlyKilled(actualState)) {
+ // The old task has exited, it is now safe to add the new one.
+ controller.addReplacement();
+ }
+ }
+ }
+
+ enum Result {
+ SUCCESS,
+ FAILED
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/54a09627/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java b/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
new file mode 100644
index 0000000..b725a31
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+/**
+ * Controls that an instance updater uses to modify the job instance it is updating, or advertise
+ * that it has reached a result.
+ */
+interface TaskController {
+ /**
+ * Terminates the instance, ensuring that it is not automatically restarted.
+ * <p>
+ * If the implementation wishes to time out on the kill attempt, it should schedule an action
+ * to invoke {@link InstanceUpdater#evaluate(com.google.common.base.Optional)} after a delay.
+ */
+ void killTask();
+
+ /**
+ * Adds a task with the new configuration to replace the old task.
+ * <p>
+ * If the implementation wishes to time out on the task replacement (such as to deal with a
+ * task stuck in {@link org.apache.aurora.gen.ScheduleStatus#PENDING}, it should schedule an
+ * action to invoke {@link InstanceUpdater#evaluate(com.google.common.base.Optional)} after a
+ * delay.
+ */
+ void addReplacement();
+
+ /**
+ * Requests that the updater be re-evaluated after the amount of time a task must remain in
+ * {@link org.apache.aurora.gen.ScheduleStatus#RUNNING} to be considered stable. The re-evaluation
+ * should be after the same amount of time as {@code minRunningTime}.
+ */
+ void reevaluteAfterRunningLimit();
+
+ /**
+ * Announces a result of the attempt to update the instance.
+ * <p>
+ * Once this callback has been made, the updater will no-op on any subsequent evaluations.
+ *
+ * @param status Update result.
+ */
+ void updateCompleted(InstanceUpdater.Result status);
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/54a09627/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
new file mode 100644
index 0000000..a845363
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCESS;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class InstanceUpdaterTest extends EasyMockTest {
+ private static final Optional<ITaskConfig> NO_CONFIG = Optional.absent();
+ private static final Optional<IScheduledTask> NO_TASK = Optional.absent();
+
+ private static final ITaskConfig OLD = ITaskConfig.build(new TaskConfig().setNumCpus(1.0));
+ private static final ITaskConfig NEW = ITaskConfig.build(new TaskConfig().setNumCpus(2.0));
+
+ private static final Amount<Long, Time> MIN_RUNNING_TIME = Amount.of(1L, Time.MINUTES);
+ private static final Amount<Long, Time> MAX_NON_RUNNING_TIME = Amount.of(5L, Time.MINUTES);
+
+ private FakeClock clock;
+ private TaskController controller;
+ private InstanceUpdater updater;
+
+ @Before
+ public void setUp() {
+ clock = new FakeClock();
+ controller = createMock(TaskController.class);
+ }
+
+ private void newUpdater(Optional<ITaskConfig> newConfig, int maxToleratedFailures) {
+ updater = new InstanceUpdater(
+ newConfig,
+ maxToleratedFailures,
+ MIN_RUNNING_TIME,
+ MAX_NON_RUNNING_TIME,
+ clock,
+ controller);
+ }
+
+ private void newUpdater(ITaskConfig newConfig, int maxToleratedFailures) {
+ newUpdater(Optional.of(newConfig), maxToleratedFailures);
+ }
+
+ private IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) {
+ List<TaskEvent> events = Lists.newArrayList();
+ if (status != PENDING) {
+ events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING));
+ }
+ if (Tasks.isTerminated(status) || status == KILLING) {
+ events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED));
+ events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING));
+ }
+
+ events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
+
+ return IScheduledTask.build(
+ new ScheduledTask()
+ .setStatus(status)
+ .setTaskEvents(ImmutableList.copyOf(events))
+ .setAssignedTask(
+ new AssignedTask()
+ .setTask(config.newBuilder())));
+ }
+
+ private void evaluate(ITaskConfig config, ScheduleStatus status, ScheduleStatus... statuses) {
+ IScheduledTask task = makeTask(config, status);
+
+ updater.evaluate(Optional.of(task));
+ for (ScheduleStatus s : statuses) {
+ ScheduledTask builder = task.newBuilder();
+ builder.setStatus(s);
+ builder.addToTaskEvents(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(s));
+ task = IScheduledTask.build(builder);
+ updater.evaluate(Optional.of(task));
+ }
+ }
+
+ @Test
+ public void testSuccessfulUpdate() {
+ controller.killTask();
+ controller.addReplacement();
+ controller.reevaluteAfterRunningLimit();
+ expectLastCall().times(4);
+ controller.updateCompleted(SUCCESS);
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+ evaluate(OLD, RUNNING, KILLING, FINISHED);
+ evaluate(NEW, PENDING, ASSIGNED, STARTING);
+ IScheduledTask task = makeTask(NEW, RUNNING);
+ updater.evaluate(Optional.of(task));
+ clock.advance(MIN_RUNNING_TIME);
+ updater.evaluate(Optional.of(task));
+ }
+
+ @Test
+ public void testUpdateRetryOnTaskExit() {
+ controller.killTask();
+ controller.addReplacement();
+ controller.reevaluteAfterRunningLimit();
+ expectLastCall().times(8);
+ controller.updateCompleted(SUCCESS);
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+ evaluate(OLD, RUNNING, KILLING, FINISHED);
+ evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+ evaluate(NEW, PENDING, ASSIGNED, STARTING);
+ IScheduledTask task = makeTask(NEW, RUNNING);
+ updater.evaluate(Optional.of(task));
+ clock.advance(MIN_RUNNING_TIME);
+ updater.evaluate(Optional.of(task));
+ }
+
+ @Test
+ public void testUpdateRetryFailure() {
+ controller.killTask();
+ controller.addReplacement();
+ controller.reevaluteAfterRunningLimit();
+ expectLastCall().times(4);
+ controller.updateCompleted(Result.FAILED);
+
+ control.replay();
+
+ newUpdater(NEW, 0);
+ evaluate(OLD, RUNNING, KILLING, FINISHED);
+ evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+ }
+
+ @Test
+ public void testNoopUpdate() {
+ controller.updateCompleted(SUCCESS);
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+ IScheduledTask task = makeTask(NEW, RUNNING);
+ clock.advance(MIN_RUNNING_TIME);
+ updater.evaluate(Optional.of(task));
+ }
+
+ @Test
+ public void testPointlessUpdate() {
+ controller.updateCompleted(SUCCESS);
+
+ control.replay();
+
+ newUpdater(NO_CONFIG, 1);
+ updater.evaluate(NO_TASK);
+ }
+
+ @Test
+ public void testNoOldConfig() {
+ controller.addReplacement();
+ controller.reevaluteAfterRunningLimit();
+ expectLastCall().times(4);
+ controller.updateCompleted(SUCCESS);
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+ updater.evaluate(NO_TASK);
+ evaluate(NEW, PENDING, ASSIGNED, STARTING);
+ IScheduledTask task = makeTask(NEW, RUNNING);
+ updater.evaluate(Optional.of(task));
+ clock.advance(MIN_RUNNING_TIME);
+ updater.evaluate(Optional.of(task));
+ }
+
+ @Test
+ public void testNoNewConfig() {
+ controller.killTask();
+ controller.updateCompleted(SUCCESS);
+
+ control.replay();
+
+ newUpdater(NO_CONFIG, 1);
+ evaluate(OLD, RUNNING, KILLING, FINISHED);
+ updater.evaluate(NO_TASK);
+ }
+
+ @Test
+ public void testAvoidsMultipleResults() {
+ controller.killTask();
+ controller.addReplacement();
+ controller.reevaluteAfterRunningLimit();
+ expectLastCall().times(4);
+ controller.updateCompleted(SUCCESS);
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+ evaluate(OLD, RUNNING, KILLING, FINISHED);
+ evaluate(NEW, PENDING, ASSIGNED, STARTING);
+ IScheduledTask task = makeTask(NEW, RUNNING);
+ updater.evaluate(Optional.of(task));
+ clock.advance(MIN_RUNNING_TIME);
+ updater.evaluate(Optional.of(task));
+ // The extra evaluation should not result in another updateCompleted().
+ updater.evaluate(Optional.of(task));
+ }
+
+ @Test
+ public void testStuckInPending() {
+ controller.killTask();
+ controller.addReplacement();
+ controller.reevaluteAfterRunningLimit();
+ expectLastCall().times(2);
+ controller.killTask();
+ controller.addReplacement();
+ controller.updateCompleted(Result.FAILED);
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+ evaluate(OLD, RUNNING, KILLING, FINISHED);
+ evaluate(NEW, PENDING);
+ IScheduledTask task1 = makeTask(NEW, PENDING);
+ clock.advance(MAX_NON_RUNNING_TIME);
+ updater.evaluate(Optional.of(task1));
+ updater.evaluate(NO_TASK);
+ evaluate(NEW, PENDING);
+ IScheduledTask task2 = makeTask(NEW, PENDING);
+ clock.advance(MAX_NON_RUNNING_TIME);
+ updater.evaluate(Optional.of(task2));
+ }
+
+ @Test
+ public void testStuckInKilling() {
+ controller.killTask();
+ controller.addReplacement();
+ controller.reevaluteAfterRunningLimit();
+ expectLastCall().times(6);
+ controller.killTask();
+ controller.addReplacement();
+ controller.updateCompleted(Result.FAILED);
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+ evaluate(OLD, RUNNING, KILLING, FINISHED);
+ evaluate(NEW, PENDING);
+ IScheduledTask task1 = makeTask(NEW, PENDING);
+ clock.advance(MAX_NON_RUNNING_TIME);
+ updater.evaluate(Optional.of(task1));
+ updater.evaluate(NO_TASK);
+ evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING);
+ IScheduledTask task2 = makeTask(NEW, KILLING);
+ updater.evaluate(Optional.of(task2));
+ clock.advance(MAX_NON_RUNNING_TIME);
+ updater.evaluate(Optional.of(task2));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidInput() {
+ control.replay();
+
+ newUpdater(NEW, 1);
+ IScheduledTask noEvents = IScheduledTask.build(
+ makeTask(OLD, RUNNING).newBuilder().setTaskEvents(ImmutableList.<TaskEvent>of()));
+ updater.evaluate(Optional.of(noEvents));
+ }
+
+ @Test
+ public void testOldTaskDies() {
+ // If the original task dies, the updater should not add a replacement if the task will be
+ // resuscitated. Only a task that has passed through KILLING will not be resuscitated.
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+
+ // Task did not pass through KILLING, therefore will be rescheudled.
+ evaluate(OLD, FINISHED);
+ }
+
+ @Test
+ public void testOldTaskDiesAndRescheduled() {
+ // Identical to testOldTaskDies, with the follow-through of rescheduling and updating.
+ // If the original task dies, the updater should not add a replacement if the task will be
+ // resuscitated. Only a task that has passed through KILLING will not be resuscitated.
+
+ controller.killTask();
+ controller.addReplacement();
+ controller.reevaluteAfterRunningLimit();
+ expectLastCall().times(4);
+ controller.updateCompleted(SUCCESS);
+
+ control.replay();
+
+ newUpdater(NEW, 1);
+
+ // Task did not pass through KILLING, therefore will be rescheudled.
+ evaluate(OLD, FINISHED);
+ evaluate(OLD, PENDING);
+ updater.evaluate(NO_TASK);
+ evaluate(NEW, PENDING, ASSIGNED, STARTING);
+ IScheduledTask task = makeTask(NEW, RUNNING);
+ updater.evaluate(Optional.of(task));
+ clock.advance(MIN_RUNNING_TIME);
+ updater.evaluate(Optional.of(task));
+ }
+}