You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/03/22 02:11:35 UTC
git commit: Preserving sandbox deleted task history (part 1: internal
enum).
Repository: incubator-aurora
Updated Branches:
refs/heads/master 15679b3db -> 1a17c4426
Preserving sandbox deleted task history (part 1: internal enum).
Bugs closed: AURORA-261
Reviewed at https://reviews.apache.org/r/19436/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1a17c442
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1a17c442
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1a17c442
Branch: refs/heads/master
Commit: 1a17c442657def645c75247ad4b4b4983fb349df
Parents: 15679b3
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Mar 21 18:11:09 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Mar 21 18:11:09 2014 -0700
----------------------------------------------------------------------
.../scheduler/state/StateManagerImpl.java | 2 +-
.../scheduler/state/TaskStateMachine.java | 146 ++++++++++++-------
.../scheduler/state/TaskStateMachineTest.java | 8 +-
3 files changed, 95 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1a17c442/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 7371c12..0161fac 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -344,7 +344,7 @@ public class StateManagerImpl implements StateManager {
@Override
public IScheduledTask apply(IScheduledTask task) {
ScheduledTask mutableTask = task.newBuilder();
- mutableTask.setStatus(stateMachine.getState());
+ mutableTask.setStatus(targetState);
mutableTask.addToTaskEvents(new TaskEvent()
.setTimestamp(clock.nowMillis())
.setStatus(targetState)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1a17c442/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index 15d1c1f..d2becea 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -21,6 +21,7 @@ import java.util.logging.Logger;
import javax.annotation.Nullable;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -41,21 +42,6 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.DRAINING;
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
import static org.apache.aurora.scheduler.state.SideEffect.Action;
import static org.apache.aurora.scheduler.state.SideEffect.Action.DELETE;
import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAILURES;
@@ -63,6 +49,21 @@ import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
import static org.apache.aurora.scheduler.state.SideEffect.Action.STATE_CHANGE;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.ASSIGNED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.DRAINING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.FAILED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.FINISHED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.INIT;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.LOST;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PENDING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PREEMPTING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.RESTARTING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.RUNNING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.STARTING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.THROTTLED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.UNKNOWN;
/**
* State machine for a task.
@@ -81,11 +82,50 @@ class TaskStateMachine {
private static final AtomicLong ILLEGAL_TRANSITIONS =
Stats.exportLong("scheduler_illegal_task_state_transitions");
- private final StateMachine<ScheduleStatus> stateMachine;
- private ScheduleStatus previousState = null;
+ private final StateMachine<TaskState> stateMachine;
+ private Optional<TaskState> previousState = Optional.absent();
private final Set<SideEffect> sideEffects = Sets.newHashSet();
+ private static final Function<IScheduledTask, TaskState> SCHEDULED_TO_TASK_STATE =
+ new Function<IScheduledTask, TaskState>() {
+ @Override
+ public TaskState apply(IScheduledTask task) {
+ return TaskState.valueOf(task.getStatus().name());
+ }
+ };
+
+ /**
+ * ScheduleStatus enum extension to account for cases where no direct state mapping exists.
+ */
+ enum TaskState {
+ INIT(Optional.of(ScheduleStatus.INIT)),
+ THROTTLED(Optional.of(ScheduleStatus.THROTTLED)),
+ PENDING(Optional.of(ScheduleStatus.PENDING)),
+ ASSIGNED(Optional.of(ScheduleStatus.ASSIGNED)),
+ STARTING(Optional.of(ScheduleStatus.STARTING)),
+ RUNNING(Optional.of(ScheduleStatus.RUNNING)),
+ FINISHED(Optional.of(ScheduleStatus.FINISHED)),
+ PREEMPTING(Optional.of(ScheduleStatus.PREEMPTING)),
+ RESTARTING(Optional.of(ScheduleStatus.RESTARTING)),
+ DRAINING(Optional.of(ScheduleStatus.DRAINING)),
+ FAILED(Optional.of(ScheduleStatus.FAILED)),
+ KILLED(Optional.of(ScheduleStatus.KILLED)),
+ KILLING(Optional.of(ScheduleStatus.KILLING)),
+ LOST(Optional.of(ScheduleStatus.LOST)),
+ UNKNOWN(Optional.of(ScheduleStatus.UNKNOWN));
+
+ private final Optional<ScheduleStatus> status;
+
+ TaskState(Optional<ScheduleStatus> status) {
+ this.status = status;
+ }
+
+ Optional<ScheduleStatus> getStatus() {
+ return status;
+ }
+ }
+
/**
* Creates a new task state machine representing a non-existent task. This allows for consistent
* state-reconciliation actions when the external system disagrees with the scheduler.
@@ -110,7 +150,7 @@ class TaskStateMachine {
MorePreconditions.checkNotBlank(name);
checkNotNull(task);
- final ScheduleStatus initialState = task.transform(Tasks.GET_STATUS).or(UNKNOWN);
+ final TaskState initialState = task.transform(SCHEDULED_TO_TASK_STATE).or(UNKNOWN);
if (task.isPresent()) {
Preconditions.checkState(
initialState != UNKNOWN,
@@ -118,11 +158,11 @@ class TaskStateMachine {
} else {
Preconditions.checkState(
initialState == UNKNOWN,
- "A task that does not exist must start un UNKNOWN state.");
+ "A task that does not exist must start in UNKNOWN state.");
}
- Closure<Transition<ScheduleStatus>> manageTerminatedTasks = Closures.combine(
- ImmutableList.<Closure<Transition<ScheduleStatus>>>builder()
+ Closure<Transition<TaskState>> manageTerminatedTasks = Closures.combine(
+ ImmutableList.<Closure<Transition<TaskState>>>builder()
// Kill a task that we believe to be terminated when an attempt is made to revive.
.add(
Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
@@ -131,10 +171,10 @@ class TaskStateMachine {
.add(Closures.filter(Transition.to(UNKNOWN), addFollowupClosure(DELETE)))
.build());
- final Closure<Transition<ScheduleStatus>> manageRestartingTask =
- new Closure<Transition<ScheduleStatus>>() {
+ final Closure<Transition<TaskState>> manageRestartingTask =
+ new Closure<Transition<TaskState>>() {
@Override
- public void execute(Transition<ScheduleStatus> transition) {
+ public void execute(Transition<TaskState> transition) {
switch (transition.getTo()) {
case ASSIGNED:
addFollowup(KILL);
@@ -206,10 +246,10 @@ class TaskStateMachine {
}
};
- final Closure<Transition<ScheduleStatus>> deleteIfKilling =
+ final Closure<Transition<TaskState>> deleteIfKilling =
Closures.filter(Transition.to(KILLING), addFollowupClosure(DELETE));
- stateMachine = StateMachine.<ScheduleStatus>builder(name)
+ stateMachine = StateMachine.<TaskState>builder(name)
.logTransitions()
.initialState(initialState)
.addState(
@@ -228,9 +268,9 @@ class TaskStateMachine {
.to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, DRAINING,
KILLED, KILLING, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<ScheduleStatus>>() {
+ new Closure<Transition<TaskState>>() {
@Override
- public void execute(Transition<ScheduleStatus> transition) {
+ public void execute(Transition<TaskState> transition) {
switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
@@ -276,9 +316,9 @@ class TaskStateMachine {
.to(RUNNING, FINISHED, FAILED, RESTARTING, DRAINING, KILLING,
KILLED, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<ScheduleStatus>>() {
+ new Closure<Transition<TaskState>>() {
@Override
- public void execute(Transition<ScheduleStatus> transition) {
+ public void execute(Transition<TaskState> transition) {
switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
@@ -328,9 +368,9 @@ class TaskStateMachine {
Rule.from(RUNNING)
.to(FINISHED, RESTARTING, DRAINING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<ScheduleStatus>>() {
+ new Closure<Transition<TaskState>>() {
@Override
- public void execute(Transition<ScheduleStatus> transition) {
+ public void execute(Transition<TaskState> transition) {
switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
@@ -413,12 +453,12 @@ class TaskStateMachine {
// Since we want this action to be performed last in the transition sequence, the callback
// must be the last chained transition callback.
.onAnyTransition(
- new Closure<Transition<ScheduleStatus>>() {
+ new Closure<Transition<TaskState>>() {
@Override
- public void execute(final Transition<ScheduleStatus> transition) {
+ public void execute(final Transition<TaskState> transition) {
if (transition.isValidStateChange()) {
- ScheduleStatus from = transition.getFrom();
- ScheduleStatus to = transition.getTo();
+ TaskState from = transition.getFrom();
+ TaskState to = transition.getTo();
// TODO(wfarner): Clean up this hack. This is here to suppress unnecessary work
// (save followed by delete), but it shows a wart with this catch-all behavior.
@@ -430,7 +470,7 @@ class TaskStateMachine {
if ((to != UNKNOWN) && pendingDeleteHack) {
addFollowup(SAVE_STATE);
}
- previousState = from;
+ previousState = Optional.of(from);
} else {
LOG.severe("Illegal state transition attempted: " + transition);
ILLEGAL_TRANSITIONS.incrementAndGet();
@@ -451,8 +491,8 @@ class TaskStateMachine {
addFollowup(new SideEffect(action, Optional.<ScheduleStatus>absent()));
}
- private void addFollowupTransition(ScheduleStatus status) {
- addFollowup(new SideEffect(STATE_CHANGE, Optional.of(status)));
+ private void addFollowupTransition(TaskState state) {
+ addFollowup(new SideEffect(STATE_CHANGE, state.getStatus()));
}
private void addFollowup(SideEffect sideEffect) {
@@ -460,10 +500,10 @@ class TaskStateMachine {
sideEffects.add(sideEffect);
}
- private Closure<Transition<ScheduleStatus>> addFollowupClosure(final Action action) {
- return new Closure<Transition<ScheduleStatus>>() {
+ private Closure<Transition<TaskState>> addFollowupClosure(final Action action) {
+ return new Closure<Transition<TaskState>>() {
@Override
- public void execute(Transition<ScheduleStatus> item) {
+ public void execute(Transition<TaskState> item) {
addFollowup(action);
}
};
@@ -486,27 +526,18 @@ class TaskStateMachine {
* state transition (e.g. storing resource consumption of a running task), we need to find
* a different way to suppress noop transitions.
*/
- if (stateMachine.getState() == status) {
+ TaskState taskState = TaskState.valueOf(status.name());
+ if (stateMachine.getState() == taskState) {
return new TransitionResult(false, ImmutableSet.<SideEffect>of());
}
- boolean success = stateMachine.transition(status);
+ boolean success = stateMachine.transition(taskState);
ImmutableSet<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects);
sideEffects.clear();
return new TransitionResult(success, transitionEffects);
}
/**
- * Fetch the current state from the state machine.
- * TODO(wfarner): Consider removing, the caller should know this.
- *
- * @return The current state.
- */
- public synchronized ScheduleStatus getState() {
- return stateMachine.getState();
- }
-
- /**
* Gets the previous state of this state machine.
*
* @return The state machine's previous state, or {@code null} if the state machine has not
@@ -514,7 +545,12 @@ class TaskStateMachine {
*/
@Nullable
ScheduleStatus getPreviousState() {
- return previousState;
+ return previousState.transform(new Function<TaskState, ScheduleStatus>() {
+ @Override
+ public ScheduleStatus apply(TaskState item) {
+ return item.getStatus().orNull();
+ }
+ }).orNull();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1a17c442/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
index 77380d9..e77063a 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
@@ -55,6 +55,7 @@ import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -261,11 +262,10 @@ public class TaskStateMachineTest {
}
private void legalTransition(ScheduleStatus state, Set<SideEffect.Action> expectedActions) {
- ScheduleStatus initialStatus = stateMachine.getState();
+ ScheduleStatus previousState = stateMachine.getPreviousState();
TransitionResult result = stateMachine.updateState(state);
assertTrue("Transition to " + state + " was not successful", result.isSuccess());
- assertEquals(initialStatus, stateMachine.getPreviousState());
- assertEquals(state, stateMachine.getState());
+ assertNotEquals(previousState, stateMachine.getPreviousState());
assertEquals(
FluentIterable.from(expectedActions).transform(TO_SIDE_EFFECT).toSet(),
result.getSideEffects());
@@ -285,9 +285,7 @@ public class TaskStateMachineTest {
}
private void illegalTransition(ScheduleStatus state, Set<SideEffect> sideEffects) {
- ScheduleStatus initialStatus = stateMachine.getState();
TransitionResult result = stateMachine.updateState(state);
- assertEquals(initialStatus, stateMachine.getState());
assertFalse(result.isSuccess());
assertEquals(sideEffects, result.getSideEffects());
}