You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/08/06 21:33:16 UTC
git commit: Refactor InstanceUpdater to remove the callback interface.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 7f21e3f41 -> dd91370f8
Refactor InstanceUpdater to remove the callback interface.
Reviewed at https://reviews.apache.org/r/24357/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/dd91370f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/dd91370f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/dd91370f
Branch: refs/heads/master
Commit: dd91370f8e4d5093a2b01ae5294853b400dcec78
Parents: 7f21e3f
Author: Bill Farner <wf...@apache.org>
Authored: Wed Aug 6 12:29:36 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Aug 6 12:29:36 2014 -0700
----------------------------------------------------------------------
.../scheduler/updater/InstanceUpdater.java | 90 ++---
.../scheduler/updater/TaskController.java | 54 ---
.../scheduler/updater/InstanceUpdaterTest.java | 390 ++++++++-----------
.../aurora/scheduler/updater/TaskUtil.java | 64 +++
4 files changed, 260 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/dd91370f/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
index 2863517..7476d82 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -33,8 +33,12 @@ import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
import static java.util.Objects.requireNonNull;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_ON_STATE_CHANGE;
import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.FAILED;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCESS;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEEDED;
/**
* In part of a job update, this manages the update of an individual instance. This includes
@@ -52,14 +56,6 @@ class InstanceUpdater {
private final Amount<Long, Time> maxNonRunningTime;
private final Clock clock;
- /**
- * Keep an optional controller reference so that we may discard the reference after we've
- * advertised completion through {@link TaskController#updateCompleted(Result) updateCompleted}.
- * This gives us a signal to no-op (when this is reset to absent), and ensures we can't send any
- * control signals from that point on.
- */
- private Optional<TaskController> controllerRef = Optional.absent();
-
private int observedFailures = 0;
InstanceUpdater(
@@ -67,15 +63,13 @@ class InstanceUpdater {
int toleratedFailures,
Amount<Long, Time> minRunningTime,
Amount<Long, Time> maxNonRunningTime,
- Clock clock,
- final TaskController controller) {
+ Clock clock) {
this.desiredState = requireNonNull(desiredState);
this.toleratedFailures = toleratedFailures;
this.minRunningTime = requireNonNull(minRunningTime);
this.maxNonRunningTime = requireNonNull(maxNonRunningTime);
this.clock = requireNonNull(clock);
- this.controllerRef = Optional.of(controller);
}
private long millisSince(ITaskEvent event) {
@@ -110,72 +104,55 @@ class InstanceUpdater {
return Tasks.isActive(status) && status != ScheduleStatus.KILLING;
}
- private void completed(Result status) {
- controllerRef.get().updateCompleted(status);
- controllerRef = Optional.absent();
- }
-
/**
* Evaluates the state differences between the originally-provided {@code desiredState} and the
- * provided {@code actualState}, and invokes any necessary actions on the provided
- * {@link TaskController}.
+ * provided {@code actualState}.
* <p>
* This function should be idempotent, with the exception of an internal failure counter that
* increments when an updating task exits, or an active but not
* {@link ScheduleStatus#RUNNING RUNNING} task takes too long to start.
*
* <p>
- * It is the reponsibility of the caller to ensure that the {@code actualState} is the latest
+ * It is the responsibility of the caller to ensure that the {@code actualState} is the latest
* value. Note: the caller should avoid calling this when a terminal task is moving to another
* terminal state. It should also suppress deletion events for tasks that have been replaced by
* an active task.
*
* @param actualState The actual observed state of the task.
+ * @return the evaluation result, including the state of the instance update, and a necessary
+ * action to perform.
*/
- synchronized void evaluate(Optional<IScheduledTask> actualState) {
- if (!controllerRef.isPresent()) {
- // Avoid any further action if a result was already given.
- return;
- }
-
- TaskController controller = controllerRef.get();
-
+ synchronized Result evaluate(Optional<IScheduledTask> actualState) {
boolean desiredPresent = desiredState.isPresent();
boolean actualPresent = actualState.isPresent();
if (desiredPresent && actualPresent) {
// The update is changing the task configuration.
- handleActualAndDesiredPresent(actualState.get());
+ return handleActualAndDesiredPresent(actualState.get());
} else if (desiredPresent) {
// The update is introducing a new instance.
- controller.addReplacement();
+ return REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
} else if (actualPresent) {
// The update is removing an instance.
- if (isKillable(actualState.get().getStatus())) {
- controller.killTask();
- }
+ return isKillable(actualState.get().getStatus())
+ ? KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE
+ : EVALUATE_ON_STATE_CHANGE;
} else {
// No-op update.
- completed(SUCCESS);
+ return SUCCEEDED;
}
}
- private boolean addFailureAndCheckIfFailed() {
+ private Result addFailureAndCheckIfFailed() {
LOG.info("Observed updated task failure.");
observedFailures++;
- if (observedFailures > toleratedFailures) {
- completed(FAILED);
- return true;
- }
- return false;
+ return observedFailures > toleratedFailures ? FAILED : EVALUATE_ON_STATE_CHANGE;
}
- private void handleActualAndDesiredPresent(IScheduledTask actualState) {
+ private Result handleActualAndDesiredPresent(IScheduledTask actualState) {
Preconditions.checkState(desiredState.isPresent());
Preconditions.checkArgument(!actualState.getTaskEvents().isEmpty());
- TaskController controller = controllerRef.get();
-
ScheduleStatus status = actualState.getStatus();
if (desiredState.get().equals(actualState.getAssignedTask().getTask())) {
// The desired task is in the system.
@@ -183,38 +160,45 @@ class InstanceUpdater {
// The desired task is running.
if (appearsStable(actualState)) {
// Stably running, our work here is done.
- completed(SUCCESS);
+ return SUCCEEDED;
} else {
// Not running long enough to consider stable, check again later.
- controller.reevaluteAfterRunningLimit();
+ return EVALUATE_AFTER_RUNNING_LIMIT;
}
} else if (Tasks.isTerminated(status)) {
// The desired task has terminated, this is a failure.
- addFailureAndCheckIfFailed();
+ return addFailureAndCheckIfFailed();
} else if (appearsStuck(actualState)) {
// The task is not running, but not terminated, and appears to have been in this state
// long enough that we should intervene.
- if (!addFailureAndCheckIfFailed()) {
- controller.killTask();
- }
+ Result updaterStatus = addFailureAndCheckIfFailed();
+ return (updaterStatus == FAILED)
+ ? updaterStatus
+ : KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
} else {
// The task is in a transient state on the way into or out of running, check back later.
- controller.reevaluteAfterRunningLimit();
+ return EVALUATE_AFTER_RUNNING_LIMIT;
}
} else {
// This is not the configuration that we would like to run.
if (isKillable(status)) {
// Task is active, kill it.
- controller.killTask();
+ return Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
} else if (Tasks.isTerminated(status) && permanentlyKilled(actualState)) {
// The old task has exited, it is now safe to add the new one.
- controller.addReplacement();
+ return Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
}
}
+
+ return EVALUATE_ON_STATE_CHANGE;
}
enum Result {
- SUCCESS,
+ EVALUATE_ON_STATE_CHANGE,
+ REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+ KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+ EVALUATE_AFTER_RUNNING_LIMIT,
+ SUCCEEDED,
FAILED
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/dd91370f/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java b/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
deleted file mode 100644
index b725a31..0000000
--- a/src/main/java/org/apache/aurora/scheduler/updater/TaskController.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.updater;
-
-/**
- * Controls that an instance updater uses to modify the job instance it is updating, or advertise
- * that it has reached a result.
- */
-interface TaskController {
- /**
- * Terminates the instance, ensuring that it is not automatically restarted.
- * <p>
- * If the implementation wishes to time out on the kill attempt, it should schedule an action
- * to invoke {@link InstanceUpdater#evaluate(com.google.common.base.Optional)} after a delay.
- */
- void killTask();
-
- /**
- * Adds a task with the new configuration to replace the old task.
- * <p>
- * If the implementation wishes to time out on the task replacement (such as to deal with a
- * task stuck in {@link org.apache.aurora.gen.ScheduleStatus#PENDING}, it should schedule an
- * action to invoke {@link InstanceUpdater#evaluate(com.google.common.base.Optional)} after a
- * delay.
- */
- void addReplacement();
-
- /**
- * Requests that the updater be re-evaluated after the amount of time a task must remain in
- * {@link org.apache.aurora.gen.ScheduleStatus#RUNNING} to be considered stable. The re-evaluation
- * should be after the same amount of time as {@code minRunningTime}.
- */
- void reevaluteAfterRunningLimit();
-
- /**
- * Announces a result of the attempt to update the instance.
- * <p>
- * Once this callback has been made, the updater will no-op on any subsequent evaluations.
- *
- * @param status Update result.
- */
- void updateCompleted(InstanceUpdater.Result status);
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/dd91370f/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
index a845363..dda1b73 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
@@ -13,25 +13,18 @@
*/
package org.apache.aurora.scheduler.updater;
-import java.util.List;
-
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.testing.FakeClock;
-import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
@@ -42,12 +35,15 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCESS;
-import static org.easymock.EasyMock.expectLastCall;
-
-public class InstanceUpdaterTest extends EasyMockTest {
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEEDED;
+import static org.junit.Assert.assertEquals;
+
+public class InstanceUpdaterTest {
private static final Optional<ITaskConfig> NO_CONFIG = Optional.absent();
- private static final Optional<IScheduledTask> NO_TASK = Optional.absent();
private static final ITaskConfig OLD = ITaskConfig.build(new TaskConfig().setNumCpus(1.0));
private static final ITaskConfig NEW = ITaskConfig.build(new TaskConfig().setNumCpus(2.0));
@@ -55,250 +51,191 @@ public class InstanceUpdaterTest extends EasyMockTest {
private static final Amount<Long, Time> MIN_RUNNING_TIME = Amount.of(1L, Time.MINUTES);
private static final Amount<Long, Time> MAX_NON_RUNNING_TIME = Amount.of(5L, Time.MINUTES);
- private FakeClock clock;
- private TaskController controller;
- private InstanceUpdater updater;
-
- @Before
- public void setUp() {
- clock = new FakeClock();
- controller = createMock(TaskController.class);
- }
+ private static class TestFixture {
+ private final FakeClock clock;
+ private final InstanceUpdater updater;
+ private final TaskUtil taskUtil;
+ private Optional<IScheduledTask> task = Optional.absent();
+
+ TestFixture(Optional<ITaskConfig> newConfig, int maxToleratedFailures) {
+ this.clock = new FakeClock();
+ this.updater = new InstanceUpdater(
+ newConfig,
+ maxToleratedFailures,
+ MIN_RUNNING_TIME,
+ MAX_NON_RUNNING_TIME,
+ clock);
+ this.taskUtil = new TaskUtil(clock);
+ }
- private void newUpdater(Optional<ITaskConfig> newConfig, int maxToleratedFailures) {
- updater = new InstanceUpdater(
- newConfig,
- maxToleratedFailures,
- MIN_RUNNING_TIME,
- MAX_NON_RUNNING_TIME,
- clock,
- controller);
- }
+ TestFixture(ITaskConfig newConfig, int maxToleratedFailures) {
+ this(Optional.of(newConfig), maxToleratedFailures);
+ }
- private void newUpdater(ITaskConfig newConfig, int maxToleratedFailures) {
- newUpdater(Optional.of(newConfig), maxToleratedFailures);
- }
+ void setActualState(ITaskConfig config) {
+ this.task = Optional.of(taskUtil.makeTask(config, PENDING));
+ }
- private IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) {
- List<TaskEvent> events = Lists.newArrayList();
- if (status != PENDING) {
- events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING));
+ void setActualStateAbsent() {
+ this.task = Optional.absent();
}
- if (Tasks.isTerminated(status) || status == KILLING) {
- events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED));
- events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING));
+
+ private Result changeStatusAndEvaluate(ScheduleStatus status) {
+ ScheduledTask builder = task.get().newBuilder();
+ if (builder.getStatus() != status) {
+ // Only add a task event if this is a state change.
+ builder.addToTaskEvents(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
+ }
+ builder.setStatus(status);
+
+ task = Optional.of(IScheduledTask.build(builder));
+ return updater.evaluate(task);
}
- events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
+ void evaluateCurrentState(Result expectedResult) {
+ assertEquals(expectedResult, updater.evaluate(task));
+ }
- return IScheduledTask.build(
- new ScheduledTask()
- .setStatus(status)
- .setTaskEvents(ImmutableList.copyOf(events))
- .setAssignedTask(
- new AssignedTask()
- .setTask(config.newBuilder())));
- }
+ void evaluate(
+ Result expectedResult,
+ ScheduleStatus status,
+ ScheduleStatus... statuses) {
- private void evaluate(ITaskConfig config, ScheduleStatus status, ScheduleStatus... statuses) {
- IScheduledTask task = makeTask(config, status);
+ assertEquals(expectedResult, changeStatusAndEvaluate(status));
+ for (ScheduleStatus s : statuses) {
+ assertEquals(expectedResult, changeStatusAndEvaluate(s));
+ }
+ }
- updater.evaluate(Optional.of(task));
- for (ScheduleStatus s : statuses) {
- ScheduledTask builder = task.newBuilder();
- builder.setStatus(s);
- builder.addToTaskEvents(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(s));
- task = IScheduledTask.build(builder);
- updater.evaluate(Optional.of(task));
+ void advanceTime(Amount<Long, Time> time) {
+ clock.advance(time);
}
}
@Test
public void testSuccessfulUpdate() {
- controller.killTask();
- controller.addReplacement();
- controller.reevaluteAfterRunningLimit();
- expectLastCall().times(4);
- controller.updateCompleted(SUCCESS);
-
- control.replay();
-
- newUpdater(NEW, 1);
- evaluate(OLD, RUNNING, KILLING, FINISHED);
- evaluate(NEW, PENDING, ASSIGNED, STARTING);
- IScheduledTask task = makeTask(NEW, RUNNING);
- updater.evaluate(Optional.of(task));
- clock.advance(MIN_RUNNING_TIME);
- updater.evaluate(Optional.of(task));
+ TestFixture f = new TestFixture(NEW, 1);
+ f.setActualState(OLD);
+ f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+ f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+ f.advanceTime(MIN_RUNNING_TIME);
+ f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testUpdateRetryOnTaskExit() {
- controller.killTask();
- controller.addReplacement();
- controller.reevaluteAfterRunningLimit();
- expectLastCall().times(8);
- controller.updateCompleted(SUCCESS);
-
- control.replay();
-
- newUpdater(NEW, 1);
- evaluate(OLD, RUNNING, KILLING, FINISHED);
- evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
- evaluate(NEW, PENDING, ASSIGNED, STARTING);
- IScheduledTask task = makeTask(NEW, RUNNING);
- updater.evaluate(Optional.of(task));
- clock.advance(MIN_RUNNING_TIME);
- updater.evaluate(Optional.of(task));
+ TestFixture f = new TestFixture(NEW, 1);
+ f.setActualState(OLD);
+ f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+ f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, FAILED);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, RUNNING);
+ f.advanceTime(MIN_RUNNING_TIME);
+ f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testUpdateRetryFailure() {
- controller.killTask();
- controller.addReplacement();
- controller.reevaluteAfterRunningLimit();
- expectLastCall().times(4);
- controller.updateCompleted(Result.FAILED);
-
- control.replay();
-
- newUpdater(NEW, 0);
- evaluate(OLD, RUNNING, KILLING, FINISHED);
- evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+ TestFixture f = new TestFixture(NEW, 0);
+ f.setActualState(OLD);
+ f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+ f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+ f.evaluate(Result.FAILED, FAILED);
}
@Test
public void testNoopUpdate() {
- controller.updateCompleted(SUCCESS);
-
- control.replay();
-
- newUpdater(NEW, 1);
- IScheduledTask task = makeTask(NEW, RUNNING);
- clock.advance(MIN_RUNNING_TIME);
- updater.evaluate(Optional.of(task));
+ TestFixture f = new TestFixture(NEW, 1);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, RUNNING);
+ f.advanceTime(MIN_RUNNING_TIME);
+ f.evaluate(SUCCEEDED, RUNNING);
}
@Test
public void testPointlessUpdate() {
- controller.updateCompleted(SUCCESS);
-
- control.replay();
-
- newUpdater(NO_CONFIG, 1);
- updater.evaluate(NO_TASK);
+ TestFixture f = new TestFixture(NO_CONFIG, 1);
+ f.setActualStateAbsent();
+ f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testNoOldConfig() {
- controller.addReplacement();
- controller.reevaluteAfterRunningLimit();
- expectLastCall().times(4);
- controller.updateCompleted(SUCCESS);
-
- control.replay();
-
- newUpdater(NEW, 1);
- updater.evaluate(NO_TASK);
- evaluate(NEW, PENDING, ASSIGNED, STARTING);
- IScheduledTask task = makeTask(NEW, RUNNING);
- updater.evaluate(Optional.of(task));
- clock.advance(MIN_RUNNING_TIME);
- updater.evaluate(Optional.of(task));
+ TestFixture f = new TestFixture(NEW, 1);
+ f.setActualStateAbsent();
+ f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+ f.advanceTime(MIN_RUNNING_TIME);
+ f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testNoNewConfig() {
- controller.killTask();
- controller.updateCompleted(SUCCESS);
-
- control.replay();
-
- newUpdater(NO_CONFIG, 1);
- evaluate(OLD, RUNNING, KILLING, FINISHED);
- updater.evaluate(NO_TASK);
- }
-
- @Test
- public void testAvoidsMultipleResults() {
- controller.killTask();
- controller.addReplacement();
- controller.reevaluteAfterRunningLimit();
- expectLastCall().times(4);
- controller.updateCompleted(SUCCESS);
-
- control.replay();
-
- newUpdater(NEW, 1);
- evaluate(OLD, RUNNING, KILLING, FINISHED);
- evaluate(NEW, PENDING, ASSIGNED, STARTING);
- IScheduledTask task = makeTask(NEW, RUNNING);
- updater.evaluate(Optional.of(task));
- clock.advance(MIN_RUNNING_TIME);
- updater.evaluate(Optional.of(task));
- // The extra evaluation should not result in another updateCompleted().
- updater.evaluate(Optional.of(task));
+ TestFixture f = new TestFixture(NO_CONFIG, 1);
+ f.setActualState(OLD);
+ f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING, FINISHED);
+ f.setActualStateAbsent();
+ f.evaluateCurrentState(SUCCEEDED);
}
@Test
public void testStuckInPending() {
- controller.killTask();
- controller.addReplacement();
- controller.reevaluteAfterRunningLimit();
- expectLastCall().times(2);
- controller.killTask();
- controller.addReplacement();
- controller.updateCompleted(Result.FAILED);
-
- control.replay();
-
- newUpdater(NEW, 1);
- evaluate(OLD, RUNNING, KILLING, FINISHED);
- evaluate(NEW, PENDING);
- IScheduledTask task1 = makeTask(NEW, PENDING);
- clock.advance(MAX_NON_RUNNING_TIME);
- updater.evaluate(Optional.of(task1));
- updater.evaluate(NO_TASK);
- evaluate(NEW, PENDING);
- IScheduledTask task2 = makeTask(NEW, PENDING);
- clock.advance(MAX_NON_RUNNING_TIME);
- updater.evaluate(Optional.of(task2));
+ TestFixture f = new TestFixture(NEW, 1);
+ f.setActualState(OLD);
+ f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+ f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING);
+ f.advanceTime(MAX_NON_RUNNING_TIME);
+ f.evaluateCurrentState(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ f.setActualStateAbsent();
+ f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING);
+ f.advanceTime(MAX_NON_RUNNING_TIME);
+ f.evaluateCurrentState(Result.FAILED);
}
@Test
public void testStuckInKilling() {
- controller.killTask();
- controller.addReplacement();
- controller.reevaluteAfterRunningLimit();
- expectLastCall().times(6);
- controller.killTask();
- controller.addReplacement();
- controller.updateCompleted(Result.FAILED);
-
- control.replay();
-
- newUpdater(NEW, 1);
- evaluate(OLD, RUNNING, KILLING, FINISHED);
- evaluate(NEW, PENDING);
- IScheduledTask task1 = makeTask(NEW, PENDING);
- clock.advance(MAX_NON_RUNNING_TIME);
- updater.evaluate(Optional.of(task1));
- updater.evaluate(NO_TASK);
- evaluate(NEW, PENDING, ASSIGNED, STARTING, RUNNING);
- IScheduledTask task2 = makeTask(NEW, KILLING);
- updater.evaluate(Optional.of(task2));
- clock.advance(MAX_NON_RUNNING_TIME);
- updater.evaluate(Optional.of(task2));
+ TestFixture f = new TestFixture(NEW, 1);
+ f.setActualState(OLD);
+ f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+ f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING);
+ f.advanceTime(MAX_NON_RUNNING_TIME);
+ f.evaluateCurrentState(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ f.setActualStateAbsent();
+ f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, ASSIGNED, STARTING, RUNNING);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, KILLING);
+ f.advanceTime(MAX_NON_RUNNING_TIME);
+ f.evaluateCurrentState(Result.FAILED);
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidInput() {
- control.replay();
-
- newUpdater(NEW, 1);
- IScheduledTask noEvents = IScheduledTask.build(
- makeTask(OLD, RUNNING).newBuilder().setTaskEvents(ImmutableList.<TaskEvent>of()));
- updater.evaluate(Optional.of(noEvents));
+ TestFixture f = new TestFixture(NEW, 1);
+ ScheduledTask noEvents = new TaskUtil(new FakeClock())
+ .makeTask(OLD, RUNNING).newBuilder().setTaskEvents(ImmutableList.<TaskEvent>of());
+ f.updater.evaluate(Optional.of(IScheduledTask.build(noEvents)));
}
@Test
@@ -306,12 +243,10 @@ public class InstanceUpdaterTest extends EasyMockTest {
// If the original task dies, the updater should not add a replacement if the task will be
// resuscitated. Only a task that has passed through KILLING will not be resuscitated.
- control.replay();
-
- newUpdater(NEW, 1);
-
- // Task did not pass through KILLING, therefore will be rescheudled.
- evaluate(OLD, FINISHED);
+ TestFixture f = new TestFixture(NEW, 1);
+ f.setActualState(OLD);
+ // Task did not pass through KILLING, therefore will be rescheduled.
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED);
}
@Test
@@ -320,24 +255,17 @@ public class InstanceUpdaterTest extends EasyMockTest {
// If the original task dies, the updater should not add a replacement if the task will be
// resuscitated. Only a task that has passed through KILLING will not be resuscitated.
- controller.killTask();
- controller.addReplacement();
- controller.reevaluteAfterRunningLimit();
- expectLastCall().times(4);
- controller.updateCompleted(SUCCESS);
-
- control.replay();
-
- newUpdater(NEW, 1);
-
- // Task did not pass through KILLING, therefore will be rescheudled.
- evaluate(OLD, FINISHED);
- evaluate(OLD, PENDING);
- updater.evaluate(NO_TASK);
- evaluate(NEW, PENDING, ASSIGNED, STARTING);
- IScheduledTask task = makeTask(NEW, RUNNING);
- updater.evaluate(Optional.of(task));
- clock.advance(MIN_RUNNING_TIME);
- updater.evaluate(Optional.of(task));
+ TestFixture f = new TestFixture(NEW, 1);
+ f.setActualState(OLD);
+
+ // Task did not pass through KILLING, therefore will be rescheduled.
+ f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED);
+ f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, PENDING);
+ f.setActualStateAbsent();
+ f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ f.setActualState(NEW);
+ f.evaluate(EVALUATE_AFTER_RUNNING_LIMIT, PENDING, ASSIGNED, STARTING, RUNNING);
+ f.advanceTime(MIN_RUNNING_TIME);
+ f.evaluateCurrentState(SUCCEEDED);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/dd91370f/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java b/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java
new file mode 100644
index 0000000..0e67f91
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/TaskUtil.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+
+final class TaskUtil {
+
+ private final FakeClock clock;
+
+ TaskUtil(FakeClock clock) {
+ this.clock = Objects.requireNonNull(clock);
+ }
+
+ IScheduledTask makeTask(ITaskConfig config, ScheduleStatus status) {
+ List<TaskEvent> events = Lists.newArrayList();
+ if (status != PENDING) {
+ events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(PENDING));
+ }
+ if (Tasks.isTerminated(status) || status == KILLING) {
+ events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(ASSIGNED));
+ events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(RUNNING));
+ }
+
+ events.add(new TaskEvent().setTimestamp(clock.nowMillis()).setStatus(status));
+
+ return IScheduledTask.build(
+ new ScheduledTask()
+ .setStatus(status)
+ .setTaskEvents(ImmutableList.copyOf(events))
+ .setAssignedTask(
+ new AssignedTask()
+ .setTask(config.newBuilder())));
+ }
+}