You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/03/22 02:11:35 UTC

git commit: Preserving sandbox deleted task history (part 1: internal enum).

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 15679b3db -> 1a17c4426


Preserving sandbox deleted task history (part 1: internal enum).

Bugs closed: AURORA-261

Reviewed at https://reviews.apache.org/r/19436/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1a17c442
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1a17c442
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1a17c442

Branch: refs/heads/master
Commit: 1a17c442657def645c75247ad4b4b4983fb349df
Parents: 15679b3
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Mar 21 18:11:09 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Mar 21 18:11:09 2014 -0700

----------------------------------------------------------------------
 .../scheduler/state/StateManagerImpl.java       |   2 +-
 .../scheduler/state/TaskStateMachine.java       | 146 ++++++++++++-------
 .../scheduler/state/TaskStateMachineTest.java   |   8 +-
 3 files changed, 95 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1a17c442/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 7371c12..0161fac 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -344,7 +344,7 @@ public class StateManagerImpl implements StateManager {
                 @Override
                 public IScheduledTask apply(IScheduledTask task) {
                   ScheduledTask mutableTask = task.newBuilder();
-                  mutableTask.setStatus(stateMachine.getState());
+                  mutableTask.setStatus(targetState);
                   mutableTask.addToTaskEvents(new TaskEvent()
                       .setTimestamp(clock.nowMillis())
                       .setStatus(targetState)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1a17c442/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index 15d1c1f..d2becea 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -21,6 +21,7 @@ import java.util.logging.Logger;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -41,21 +42,6 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.DRAINING;
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
 import static org.apache.aurora.scheduler.state.SideEffect.Action;
 import static org.apache.aurora.scheduler.state.SideEffect.Action.DELETE;
 import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAILURES;
@@ -63,6 +49,21 @@ import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
 import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
 import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
 import static org.apache.aurora.scheduler.state.SideEffect.Action.STATE_CHANGE;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.ASSIGNED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.DRAINING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.FAILED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.FINISHED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.INIT;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.LOST;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PENDING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PREEMPTING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.RESTARTING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.RUNNING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.STARTING;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.THROTTLED;
+import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.UNKNOWN;
 
 /**
  * State machine for a task.
@@ -81,11 +82,50 @@ class TaskStateMachine {
   private static final AtomicLong ILLEGAL_TRANSITIONS =
       Stats.exportLong("scheduler_illegal_task_state_transitions");
 
-  private final StateMachine<ScheduleStatus> stateMachine;
-  private ScheduleStatus previousState = null;
+  private final StateMachine<TaskState> stateMachine;
+  private Optional<TaskState> previousState = Optional.absent();
 
   private final Set<SideEffect> sideEffects = Sets.newHashSet();
 
+  private static final Function<IScheduledTask, TaskState> SCHEDULED_TO_TASK_STATE =
+      new Function<IScheduledTask, TaskState>() {
+        @Override
+        public TaskState apply(IScheduledTask task) {
+          return TaskState.valueOf(task.getStatus().name());
+        }
+      };
+
+  /**
+   * ScheduleStatus enum extension to account for cases where no direct state mapping exists.
+   */
+  enum TaskState {
+    INIT(Optional.of(ScheduleStatus.INIT)),
+    THROTTLED(Optional.of(ScheduleStatus.THROTTLED)),
+    PENDING(Optional.of(ScheduleStatus.PENDING)),
+    ASSIGNED(Optional.of(ScheduleStatus.ASSIGNED)),
+    STARTING(Optional.of(ScheduleStatus.STARTING)),
+    RUNNING(Optional.of(ScheduleStatus.RUNNING)),
+    FINISHED(Optional.of(ScheduleStatus.FINISHED)),
+    PREEMPTING(Optional.of(ScheduleStatus.PREEMPTING)),
+    RESTARTING(Optional.of(ScheduleStatus.RESTARTING)),
+    DRAINING(Optional.of(ScheduleStatus.DRAINING)),
+    FAILED(Optional.of(ScheduleStatus.FAILED)),
+    KILLED(Optional.of(ScheduleStatus.KILLED)),
+    KILLING(Optional.of(ScheduleStatus.KILLING)),
+    LOST(Optional.of(ScheduleStatus.LOST)),
+    UNKNOWN(Optional.of(ScheduleStatus.UNKNOWN));
+
+    private final Optional<ScheduleStatus> status;
+
+    TaskState(Optional<ScheduleStatus> status) {
+      this.status = status;
+    }
+
+    Optional<ScheduleStatus> getStatus() {
+      return status;
+    }
+  }
+
   /**
    * Creates a new task state machine representing a non-existent task.  This allows for consistent
    * state-reconciliation actions when the external system disagrees with the scheduler.
@@ -110,7 +150,7 @@ class TaskStateMachine {
     MorePreconditions.checkNotBlank(name);
     checkNotNull(task);
 
-    final ScheduleStatus initialState = task.transform(Tasks.GET_STATUS).or(UNKNOWN);
+    final TaskState initialState = task.transform(SCHEDULED_TO_TASK_STATE).or(UNKNOWN);
     if (task.isPresent()) {
       Preconditions.checkState(
           initialState != UNKNOWN,
@@ -118,11 +158,11 @@ class TaskStateMachine {
     } else {
       Preconditions.checkState(
           initialState == UNKNOWN,
-          "A task that does not exist must start un UNKNOWN state.");
+          "A task that does not exist must start in UNKNOWN state.");
     }
 
-    Closure<Transition<ScheduleStatus>> manageTerminatedTasks = Closures.combine(
-        ImmutableList.<Closure<Transition<ScheduleStatus>>>builder()
+    Closure<Transition<TaskState>> manageTerminatedTasks = Closures.combine(
+        ImmutableList.<Closure<Transition<TaskState>>>builder()
             // Kill a task that we believe to be terminated when an attempt is made to revive.
             .add(
                 Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
@@ -131,10 +171,10 @@ class TaskStateMachine {
             .add(Closures.filter(Transition.to(UNKNOWN), addFollowupClosure(DELETE)))
             .build());
 
-    final Closure<Transition<ScheduleStatus>> manageRestartingTask =
-        new Closure<Transition<ScheduleStatus>>() {
+    final Closure<Transition<TaskState>> manageRestartingTask =
+        new Closure<Transition<TaskState>>() {
           @Override
-          public void execute(Transition<ScheduleStatus> transition) {
+          public void execute(Transition<TaskState> transition) {
             switch (transition.getTo()) {
               case ASSIGNED:
                 addFollowup(KILL);
@@ -206,10 +246,10 @@ class TaskStateMachine {
       }
     };
 
-    final Closure<Transition<ScheduleStatus>> deleteIfKilling =
+    final Closure<Transition<TaskState>> deleteIfKilling =
         Closures.filter(Transition.to(KILLING), addFollowupClosure(DELETE));
 
-    stateMachine = StateMachine.<ScheduleStatus>builder(name)
+    stateMachine = StateMachine.<TaskState>builder(name)
         .logTransitions()
         .initialState(initialState)
         .addState(
@@ -228,9 +268,9 @@ class TaskStateMachine {
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, DRAINING,
                     KILLED, KILLING, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<ScheduleStatus>>() {
+                    new Closure<Transition<TaskState>>() {
                       @Override
-                      public void execute(Transition<ScheduleStatus> transition) {
+                      public void execute(Transition<TaskState> transition) {
                         switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
@@ -276,9 +316,9 @@ class TaskStateMachine {
                 .to(RUNNING, FINISHED, FAILED, RESTARTING, DRAINING, KILLING,
                     KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<ScheduleStatus>>() {
+                    new Closure<Transition<TaskState>>() {
                       @Override
-                      public void execute(Transition<ScheduleStatus> transition) {
+                      public void execute(Transition<TaskState> transition) {
                         switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
@@ -328,9 +368,9 @@ class TaskStateMachine {
             Rule.from(RUNNING)
                 .to(FINISHED, RESTARTING, DRAINING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<ScheduleStatus>>() {
+                    new Closure<Transition<TaskState>>() {
                       @Override
-                      public void execute(Transition<ScheduleStatus> transition) {
+                      public void execute(Transition<TaskState> transition) {
                         switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
@@ -413,12 +453,12 @@ class TaskStateMachine {
         // Since we want this action to be performed last in the transition sequence, the callback
         // must be the last chained transition callback.
         .onAnyTransition(
-            new Closure<Transition<ScheduleStatus>>() {
+            new Closure<Transition<TaskState>>() {
               @Override
-              public void execute(final Transition<ScheduleStatus> transition) {
+              public void execute(final Transition<TaskState> transition) {
                 if (transition.isValidStateChange()) {
-                  ScheduleStatus from = transition.getFrom();
-                  ScheduleStatus to = transition.getTo();
+                  TaskState from = transition.getFrom();
+                  TaskState to = transition.getTo();
 
                   // TODO(wfarner): Clean up this hack.  This is here to suppress unnecessary work
                   // (save followed by delete), but it shows a wart with this catch-all behavior.
@@ -430,7 +470,7 @@ class TaskStateMachine {
                   if ((to != UNKNOWN) && pendingDeleteHack) {
                     addFollowup(SAVE_STATE);
                   }
-                  previousState = from;
+                  previousState = Optional.of(from);
                 } else {
                   LOG.severe("Illegal state transition attempted: " + transition);
                   ILLEGAL_TRANSITIONS.incrementAndGet();
@@ -451,8 +491,8 @@ class TaskStateMachine {
     addFollowup(new SideEffect(action, Optional.<ScheduleStatus>absent()));
   }
 
-  private void addFollowupTransition(ScheduleStatus status) {
-    addFollowup(new SideEffect(STATE_CHANGE, Optional.of(status)));
+  private void addFollowupTransition(TaskState state) {
+    addFollowup(new SideEffect(STATE_CHANGE, state.getStatus()));
   }
 
   private void addFollowup(SideEffect sideEffect) {
@@ -460,10 +500,10 @@ class TaskStateMachine {
     sideEffects.add(sideEffect);
   }
 
-  private Closure<Transition<ScheduleStatus>> addFollowupClosure(final Action action) {
-    return new Closure<Transition<ScheduleStatus>>() {
+  private Closure<Transition<TaskState>> addFollowupClosure(final Action action) {
+    return new Closure<Transition<TaskState>>() {
       @Override
-      public void execute(Transition<ScheduleStatus> item) {
+      public void execute(Transition<TaskState> item) {
         addFollowup(action);
       }
     };
@@ -486,27 +526,18 @@ class TaskStateMachine {
      * state transition (e.g. storing resource consumption of a running task), we need to find
      * a different way to suppress noop transitions.
      */
-    if (stateMachine.getState() == status) {
+    TaskState taskState = TaskState.valueOf(status.name());
+    if (stateMachine.getState() == taskState) {
       return new TransitionResult(false, ImmutableSet.<SideEffect>of());
     }
 
-    boolean success = stateMachine.transition(status);
+    boolean success = stateMachine.transition(taskState);
     ImmutableSet<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects);
     sideEffects.clear();
     return new TransitionResult(success, transitionEffects);
   }
 
   /**
-   * Fetch the current state from the state machine.
-   * TODO(wfarner): Consider removing, the caller should know this.
-   *
-   * @return The current state.
-   */
-  public synchronized ScheduleStatus getState() {
-    return stateMachine.getState();
-  }
-
-  /**
    * Gets the previous state of this state machine.
    *
    * @return The state machine's previous state, or {@code null} if the state machine has not
@@ -514,7 +545,12 @@ class TaskStateMachine {
    */
   @Nullable
   ScheduleStatus getPreviousState() {
-    return previousState;
+    return previousState.transform(new Function<TaskState, ScheduleStatus>() {
+      @Override
+      public ScheduleStatus apply(TaskState item) {
+        return item.getStatus().orNull();
+      }
+    }).orNull();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1a17c442/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
index 77380d9..e77063a 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
@@ -55,6 +55,7 @@ import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -261,11 +262,10 @@ public class TaskStateMachineTest {
   }
 
   private void legalTransition(ScheduleStatus state, Set<SideEffect.Action> expectedActions) {
-    ScheduleStatus initialStatus = stateMachine.getState();
+    ScheduleStatus previousState = stateMachine.getPreviousState();
     TransitionResult result = stateMachine.updateState(state);
     assertTrue("Transition to " + state + " was not successful", result.isSuccess());
-    assertEquals(initialStatus, stateMachine.getPreviousState());
-    assertEquals(state, stateMachine.getState());
+    assertNotEquals(previousState, stateMachine.getPreviousState());
     assertEquals(
         FluentIterable.from(expectedActions).transform(TO_SIDE_EFFECT).toSet(),
         result.getSideEffects());
@@ -285,9 +285,7 @@ public class TaskStateMachineTest {
   }
 
   private void illegalTransition(ScheduleStatus state, Set<SideEffect> sideEffects) {
-    ScheduleStatus initialStatus = stateMachine.getState();
     TransitionResult result = stateMachine.updateState(state);
-    assertEquals(initialStatus, stateMachine.getState());
     assertFalse(result.isSuccess());
     assertEquals(sideEffects, result.getSideEffects());
   }