You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/08/11 23:32:40 UTC

git commit: Add a one-way job update controller.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 842f478bb -> 8d985429d


Add a one-way job update controller.

Bugs closed: AURORA-613

Reviewed at https://reviews.apache.org/r/24465/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/8d985429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/8d985429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/8d985429

Branch: refs/heads/master
Commit: 8d985429d8752bfa130cfb8606101614033818a2
Parents: 842f478
Author: Bill Farner <wf...@apache.org>
Authored: Mon Aug 11 14:30:12 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Aug 11 14:30:12 2014 -0700

----------------------------------------------------------------------
 .../updater/InstanceStateProvider.java          |  31 ++
 .../scheduler/updater/InstanceUpdater.java      |  54 +---
 .../scheduler/updater/OneWayJobUpdater.java     | 320 +++++++++++++++++++
 .../scheduler/updater/StateEvaluator.java       |  54 ++++
 .../scheduler/updater/InstanceUpdaterTest.java  |  12 +-
 .../scheduler/updater/OneWayJobUpdaterTest.java | 232 ++++++++++++++
 6 files changed, 657 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/main/java/org/apache/aurora/scheduler/updater/InstanceStateProvider.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceStateProvider.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceStateProvider.java
new file mode 100644
index 0000000..8b80e53
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceStateProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+/**
+ * Provider that fetches the state associated with an instance ID.
+ *
+ * @param <K> The identifier type for instances.
+ * @param <T> Instance data type.
+ */
+interface InstanceStateProvider<K, T> {
+
+  /**
+   * Fetches the latest state for an instance.
+   *
+   * @param instanceId Instance identifier.
+   * @return Data associated with {@code instanceId}.
+   */
+  T getState(K instanceId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
index 7476d82..85196af 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -33,12 +33,12 @@ import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_AFTER_RUNNING_LIMIT;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.FAILED;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEEDED;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.FAILED;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
 
 /**
  * In part of a job update, this manages the update of an individual instance. This includes
@@ -47,7 +47,7 @@ import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEED
  *
  * TODO(wfarner): This probably needs to be parameterized so that it may be reused for rollbacks.
  */
-class InstanceUpdater {
+class InstanceUpdater implements StateEvaluator<Optional<IScheduledTask>> {
   private static final Logger LOG = Logger.getLogger(InstanceUpdater.class.getName());
 
   private final Optional<ITaskConfig> desiredState;
@@ -104,25 +104,8 @@ class InstanceUpdater {
     return Tasks.isActive(status) && status != ScheduleStatus.KILLING;
   }
 
-  /**
-   * Evaluates the state differences between the originally-provided {@code desiredState} and the
-   * provided {@code actualState}.
-   * <p>
-   * This function should be idempotent, with the exception of an internal failure counter that
-   * increments when an updating task exits, or an active but not
-   * {@link ScheduleStatus#RUNNING RUNNING} task takes too long to start.
-   *
-   * <p>
-   * It is the responsibility of the caller to ensure that the {@code actualState} is the latest
-   * value.  Note: the caller should avoid calling this when a terminal task is moving to another
-   * terminal state.  It should also suppress deletion events for tasks that have been replaced by
-   * an active task.
-   *
-   * @param actualState The actual observed state of the task.
-   * @return the evaluation result, including the state of the instance update, and a necessary
-   *         action to perform.
-   */
-  synchronized Result evaluate(Optional<IScheduledTask> actualState) {
+  @Override
+  public synchronized StateEvaluator.Result evaluate(Optional<IScheduledTask> actualState) {
     boolean desiredPresent = desiredState.isPresent();
     boolean actualPresent = actualState.isPresent();
 
@@ -143,13 +126,13 @@ class InstanceUpdater {
     }
   }
 
-  private Result addFailureAndCheckIfFailed() {
+  private StateEvaluator.Result addFailureAndCheckIfFailed() {
     LOG.info("Observed updated task failure.");
     observedFailures++;
     return observedFailures > toleratedFailures ? FAILED : EVALUATE_ON_STATE_CHANGE;
   }
 
-  private Result handleActualAndDesiredPresent(IScheduledTask actualState) {
+  private StateEvaluator.Result handleActualAndDesiredPresent(IScheduledTask actualState) {
     Preconditions.checkState(desiredState.isPresent());
     Preconditions.checkArgument(!actualState.getTaskEvents().isEmpty());
 
@@ -171,7 +154,7 @@ class InstanceUpdater {
       } else if (appearsStuck(actualState)) {
         // The task is not running, but not terminated, and appears to have been in this state
         // long enough that we should intervene.
-        Result updaterStatus = addFailureAndCheckIfFailed();
+        StateEvaluator.Result updaterStatus = addFailureAndCheckIfFailed();
         return (updaterStatus == FAILED)
             ? updaterStatus
             : KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
@@ -183,22 +166,13 @@ class InstanceUpdater {
       // This is not the configuration that we would like to run.
       if (isKillable(status)) {
         // Task is active, kill it.
-        return Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+        return StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
       } else if (Tasks.isTerminated(status) && permanentlyKilled(actualState)) {
         // The old task has exited, it is now safe to add the new one.
-        return Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+        return StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
       }
     }
 
     return EVALUATE_ON_STATE_CHANGE;
   }
-
-  enum Result {
-    EVALUATE_ON_STATE_CHANGE,
-    REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
-    KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
-    EVALUATE_AFTER_RUNNING_LIMIT,
-    SUCCEEDED,
-    FAILED
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
new file mode 100644
index 0000000..aa8b5f2
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.twitter.common.util.StateMachine;
+
+import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
+
+/**
+ * Controller for a one-way job update (i.e. no rollbacks).  The controller will coordinate updates
+ * of all instances within the job, and roll up the results of the individual updates into the
+ * result of the job update.
+ *
+ * @param <K> Type used to uniquely identify instances.
+ * @param <T> Instance data type.
+ */
+class OneWayJobUpdater<K, T> {
+  private final UpdateStrategy<K> strategy;
+  private final int maxFailedInstances;
+  private final Map<K, InstanceUpdate<T>> instances;
+  private final StateMachine<OneWayStatus> stateMachine =
+      StateMachine.<OneWayStatus>builder("job_update")
+          .initialState(OneWayStatus.IDLE)
+          .addState(OneWayStatus.IDLE, OneWayStatus.WORKING)
+          .addState(OneWayStatus.WORKING, OneWayStatus.SUCCEEDED, OneWayStatus.FAILED)
+          .addState(OneWayStatus.SUCCEEDED)
+          .addState(OneWayStatus.FAILED)
+          .throwOnBadTransition(true)
+          .build();
+
+  /**
+   * Creates a new one-way updater.
+   *
+   * @param strategy The strategy to decide which instances to update after a state change.
+   * @param maxFailedInstances Maximum tolerated failures before the update is considered failed.
+   * @param instanceEvaluators Evaluate the state of individual instances, and decide what actions
+   *                           must be taken to update them.
+   */
+  OneWayJobUpdater(
+      UpdateStrategy<K> strategy,
+      int maxFailedInstances,
+      Map<K, StateEvaluator<T>> instanceEvaluators) {
+
+    this.strategy = requireNonNull(strategy);
+    this.maxFailedInstances = maxFailedInstances;
+    checkArgument(!instanceEvaluators.isEmpty());
+
+    this.instances = ImmutableMap.copyOf(Maps.transformValues(
+        instanceEvaluators,
+        new Function<StateEvaluator<T>, InstanceUpdate<T>>() {
+          @Override
+          public InstanceUpdate<T> apply(StateEvaluator<T> evaluator) {
+            return new InstanceUpdate<>(evaluator);
+          }
+        }));
+  }
+
+  private static final Function<InstanceUpdate<?>, InstanceUpdateStatus> GET_STATE =
+      new Function<InstanceUpdate<?>, InstanceUpdateStatus>() {
+        @Override
+        public InstanceUpdateStatus apply(InstanceUpdate<?> manager) {
+          return manager.getState();
+        }
+      };
+
+  private static <K, T> Map<K, InstanceUpdate<T>> filterByStatus(
+      Map<K, InstanceUpdate<T>> instances,
+      InstanceUpdateStatus status) {
+
+    return ImmutableMap.copyOf(
+        Maps.filterValues(instances, Predicates.compose(Predicates.equalTo(status), GET_STATE)));
+  }
+
+  private static Optional<InstanceAction> resultToAction(Result result) {
+    switch (result) {
+      case EVALUATE_ON_STATE_CHANGE:
+        return Optional.of(InstanceAction.EVALUATE_ON_STATE_CHANGE);
+      case REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE:
+        return Optional.of(InstanceAction.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+      case KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE:
+        return Optional.of(InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+      case EVALUATE_AFTER_RUNNING_LIMIT:
+        return Optional.of(InstanceAction.EVALUATE_AFTER_RUNNING_LIMIT);
+      default:
+        break;
+    }
+
+    return Optional.absent();
+  }
+
+  /**
+   * Performs an evaluation of the job.  An evaluation would normally be triggered to initiate the
+   * update, as a result of a state change relevant to the update, or due to a
+   * {@link InstanceAction#EVALUATE_AFTER_RUNNING_LIMIT requested} instance re-evaluation.
+   *
+   * @param instancesNeedingUpdate Instances triggering the event, if any.
+   * @param stateProvider Provider to fetch state of instances, and pass to
+   *                      {@link StateEvaluator#evaluate(Object)}.
+   * @return The outcome of the evaluation, including the state of the job update and actions the
+   *         caller should perform on individual instances.
+   * @throws IllegalStateException if the job updater is not currently
+   *         {@link OneWayStatus#WORKING working} state, as indicated by a previous evaluation.
+   */
+  synchronized EvaluationResult<K> evaluate(
+      Set<K> instancesNeedingUpdate,
+      InstanceStateProvider<K, T> stateProvider) {
+
+    if (stateMachine.getState() == OneWayStatus.IDLE) {
+      stateMachine.transition(OneWayStatus.WORKING);
+    }
+    Preconditions.checkState(
+        stateMachine.getState() == OneWayStatus.WORKING,
+        "Attempted to evaluate an inactive job updater.");
+
+    // Call order is important here: update on-demand instances, evaluate new instances, compute
+    // job update state.
+    Map<K, InstanceAction> actions = ImmutableMap.<K, InstanceAction>builder()
+        // Re-evaluate instances that are in need of update.
+        .putAll(evaluateInstances(instancesNeedingUpdate, stateProvider))
+        // If ready to begin updating more instances, evaluate those as well.
+        .putAll(startNextInstanceGroup(stateProvider))
+        .build();
+
+    return new EvaluationResult<K>(computeJobUpdateStatus(), actions);
+  }
+
+  private Map<K, InstanceAction> evaluateInstances(
+      Set<K> instanceIds,
+      InstanceStateProvider<K, T> stateProvider) {
+
+    ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.builder();
+    for (K instanceId : instanceIds) {
+      InstanceUpdate<T> update = instances.get(instanceId);
+      // Suppress state changes for updates that are not in-progress.
+      if (update.getState() == InstanceUpdateStatus.WORKING) {
+        Optional<InstanceAction> action =
+            resultToAction(update.evaluate(stateProvider.getState(instanceId)));
+        if (action.isPresent()) {
+          actions.put(instanceId, action.get());
+        }
+      }
+    }
+
+    return actions.build();
+  }
+
+  private Map<K, InstanceAction> startNextInstanceGroup(InstanceStateProvider<K, T> stateProvider) {
+    ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.builder();
+
+    Map<K, InstanceUpdate<T>> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
+    if (!idle.isEmpty()) {
+      Map<K, InstanceUpdate<T>> working =
+          filterByStatus(instances, InstanceUpdateStatus.WORKING);
+      for (K instance : strategy.getNextGroup(idle.keySet(), working.keySet())) {
+        Result result = instances.get(instance).evaluate(stateProvider.getState(instance));
+        Optional<InstanceAction> action = resultToAction(result);
+        if (action.isPresent()) {
+          actions.put(instance, action.get());
+        }
+      }
+    }
+
+    return actions.build();
+  }
+
+  private OneWayStatus computeJobUpdateStatus() {
+    Map<K, InstanceUpdate<T>> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
+    Map<K, InstanceUpdate<T>> working =
+        filterByStatus(instances, InstanceUpdateStatus.WORKING);
+    Map<K, InstanceUpdate<T>> failed = filterByStatus(instances, InstanceUpdateStatus.FAILED);
+    // TODO(wfarner): This needs to be updated to support rollback.
+    if (failed.size() > maxFailedInstances) {
+      stateMachine.transition(OneWayStatus.FAILED);
+    } else if (working.isEmpty() && idle.isEmpty()) {
+      stateMachine.transition(OneWayStatus.SUCCEEDED);
+    }
+
+    return stateMachine.getState();
+  }
+
+  /**
+   * Container and state for the update of an individual instance.
+   */
+  private static class InstanceUpdate<T> {
+    private final StateEvaluator<T> evaluator;
+    private final StateMachine<InstanceUpdateStatus> stateMachine =
+        StateMachine.<InstanceUpdateStatus>builder("instance_update")
+            .initialState(InstanceUpdateStatus.IDLE)
+            .addState(InstanceUpdateStatus.IDLE, InstanceUpdateStatus.WORKING)
+            .addState(
+                InstanceUpdateStatus.WORKING,
+                InstanceUpdateStatus.SUCCEEDED,
+                InstanceUpdateStatus.FAILED)
+            .addState(InstanceUpdateStatus.SUCCEEDED)
+            .addState(InstanceUpdateStatus.FAILED)
+            .throwOnBadTransition(true)
+            .build();
+
+    InstanceUpdate(StateEvaluator<T> evaluator) {
+      this.evaluator = requireNonNull(evaluator);
+    }
+
+    InstanceUpdateStatus getState() {
+      return stateMachine.getState();
+    }
+
+    Result evaluate(T actualState) {
+      if (stateMachine.getState() == InstanceUpdateStatus.IDLE) {
+        stateMachine.transition(InstanceUpdateStatus.WORKING);
+      }
+
+      Result result = evaluator.evaluate(actualState);
+      if (result == Result.SUCCEEDED) {
+        stateMachine.transition(InstanceUpdateStatus.SUCCEEDED);
+      } else if (result == Result.FAILED) {
+        stateMachine.transition(InstanceUpdateStatus.FAILED);
+      }
+      return result;
+    }
+  }
+
+  private enum InstanceUpdateStatus {
+    IDLE,
+    WORKING,
+    SUCCEEDED,
+    FAILED
+  }
+
+  /**
+   * Status of the job update.
+   */
+  enum OneWayStatus {
+    IDLE,
+    WORKING,
+    SUCCEEDED,
+    FAILED
+  }
+
+  /**
+   * Action that should be performed by the caller to converge towards the desired update state.
+   */
+  enum InstanceAction {
+    EVALUATE_ON_STATE_CHANGE,
+    REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+    KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+    EVALUATE_AFTER_RUNNING_LIMIT
+  }
+
+  /**
+   * Result of an evaluation round.
+   */
+  static class EvaluationResult<K> {
+    private final OneWayStatus jobStatus;
+    private final Map<K, InstanceAction> instanceActions;
+
+    EvaluationResult(OneWayStatus jobStatus, Map<K, InstanceAction> instanceActions) {
+      this.jobStatus = requireNonNull(jobStatus);
+      this.instanceActions = requireNonNull(instanceActions);
+    }
+
+    public OneWayStatus getJobStatus() {
+      return jobStatus;
+    }
+
+    public Map<K, InstanceAction> getInstanceActions() {
+      return instanceActions;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof EvaluationResult)) {
+        return false;
+      }
+      @SuppressWarnings("unchecked")
+      EvaluationResult<K> other = (EvaluationResult<K>) obj;
+      return other.getJobStatus().equals(this.getJobStatus())
+          && other.getInstanceActions().equals(this.getInstanceActions());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getJobStatus(), getInstanceActions());
+    }
+
+    @Override
+    public String toString() {
+      return com.google.common.base.Objects.toStringHelper(this)
+          .add("jobStatus", getJobStatus())
+          .add("instanceActions", getInstanceActions())
+          .toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java b/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
new file mode 100644
index 0000000..dca55ef
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+/**
+ * Determines actions that must be taken to change the configuration of a running task.
+ * <p>
+ * A state evaluator is expected to be used multiple times over the course of changing an active
+ * task's configuration.  This should be invoked every time the state of an instance changes, to
+ * determine what action to take next.  It's expected that it will eventually converge by
+ * {@link Result#SUCCEEDED succeeding} or {@link Result#FAILED failing}.
+ *
+ * @param <T> Instance state type.
+ */
+interface StateEvaluator<T> {
+
+  /**
+   * Evaluates the state differences between the desired state and the provided {@code actualState}.
+   * <p>
+   * This function should be idempotent, with the exception of an internal failure counter that
+   * increments when an updating task exits, or an active but not
+   * {@link org.apache.aurora.gen.ScheduleStatus#RUNNING RUNNING} task takes too long to start.
+   * <p>
+   * It is the responsibility of the caller to ensure that the {@code actualState} is the latest
+   * value.  Note: the caller should avoid calling this when a terminal task is moving to another
+   * terminal state.  It should also suppress deletion events for tasks that have been replaced by
+   * an active task.
+   *
+   * @param actualState The actual observed state of the task.
+   * @return the evaluation result, including the state of the instance update, and a necessary
+   *         action to perform.
+   */
+  Result evaluate(T actualState);
+
+  enum Result {
+    EVALUATE_ON_STATE_CHANGE,
+    REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+    KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+    EVALUATE_AFTER_RUNNING_LIMIT,
+    SUCCEEDED,
+    FAILED
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
index dda1b73..d7baaa6 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
@@ -34,12 +34,12 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_AFTER_RUNNING_LIMIT;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEEDED;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
 import static org.junit.Assert.assertEquals;
 
 public class InstanceUpdaterTest {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
new file mode 100644
index 0000000..e3e50d7
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.EvaluationResult;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.InstanceAction;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.FAILED;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OneWayJobUpdaterTest extends EasyMockTest {
+  private static final Set<Integer> EMPTY = ImmutableSet.of();
+  private static final Map<Integer, InstanceAction> NO_ACTIONS = ImmutableMap.of();
+
+  private UpdateStrategy<Integer> strategy;
+  private StateEvaluator<String> instance0;
+  private StateEvaluator<String> instance1;
+  private StateEvaluator<String> instance2;
+  private StateEvaluator<String> instance3;
+  private Map<Integer, StateEvaluator<String>> allInstances;
+  private InstanceStateProvider<Integer, String> stateProvider;
+
+  private OneWayJobUpdater<Integer, String> jobUpdater;
+
+  @Before
+  public void setUp() {
+    strategy = createMock(new Clazz<UpdateStrategy<Integer>>() { });
+    instance0 = createMock(new Clazz<StateEvaluator<String>>() { });
+    instance1 = createMock(new Clazz<StateEvaluator<String>>() { });
+    instance2 = createMock(new Clazz<StateEvaluator<String>>() { });
+    instance3 = createMock(new Clazz<StateEvaluator<String>>() { });
+    allInstances = ImmutableMap.of(
+        0, instance0,
+        1, instance1,
+        2, instance2,
+        3, instance3);
+    stateProvider = createMock(new Clazz<InstanceStateProvider<Integer, String>>() { });
+  }
+
+  private void evaluate(OneWayStatus expectedStatus, Map<Integer, InstanceAction> expectedActions) {
+    assertEquals(
+        new EvaluationResult<>(expectedStatus, expectedActions),
+        jobUpdater.evaluate(ImmutableSet.<Integer>of(), stateProvider));
+  }
+
+  private void evaluate(
+      int instanceId,
+      OneWayStatus expectedStatus,
+      Map<Integer, InstanceAction> expectedActions) {
+
+    assertEquals(
+        new EvaluationResult<>(expectedStatus, expectedActions),
+        jobUpdater.evaluate(ImmutableSet.of(instanceId), stateProvider));
+  }
+
+  private void expectEvaluate(
+      int instanceId,
+      StateEvaluator<String> instanceMock,
+      String state,
+      Result result) {
+
+    expect(stateProvider.getState(instanceId)).andReturn(state);
+    expect(instanceMock.evaluate(state)).andReturn(result);
+  }
+
+  @Test
+  public void testSuccessfulUpdate() {
+    expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
+        .andReturn(ImmutableSet.of(0, 2));
+    String s0 = "0";
+    String s1 = "1";
+    String s2 = "2";
+    String s3 = "3";
+    expectEvaluate(
+        0,
+        instance0,
+        s0,
+        KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+    expectEvaluate(
+        2,
+        instance2,
+        s2,
+        REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+
+    expectEvaluate(0, instance0, s0, EVALUATE_ON_STATE_CHANGE);
+    expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(0, 2))).andReturn(EMPTY);
+    expectEvaluate(0, instance0, s0, SUCCEEDED);
+    expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(2))).andReturn(EMPTY);
+    expectEvaluate(2, instance2, s2, SUCCEEDED);
+    expect(strategy.getNextGroup(ImmutableSet.of(1, 3), EMPTY))
+        .andReturn(ImmutableSet.of(1, 3));
+    expectEvaluate(
+        1,
+        instance1,
+        s1,
+        SUCCEEDED);
+    expectEvaluate(
+        3,
+        instance3,
+        s3,
+        EVALUATE_AFTER_RUNNING_LIMIT);
+    expectEvaluate(3, instance3, s3, SUCCEEDED);
+
+    control.replay();
+
+    jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
+
+    evaluate(
+        OneWayStatus.WORKING,
+        ImmutableMap.of(
+            0, InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+            2, InstanceAction.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE));
+    evaluate(
+        0,
+        OneWayStatus.WORKING,
+        ImmutableMap.of(0, InstanceAction.EVALUATE_ON_STATE_CHANGE));
+    evaluate(
+        0,
+        OneWayStatus.WORKING,
+        NO_ACTIONS);
+    evaluate(
+        2,
+        OneWayStatus.WORKING,
+        ImmutableMap.of(
+            3, InstanceAction.EVALUATE_AFTER_RUNNING_LIMIT));
+    evaluate(
+        3,
+        OneWayStatus.SUCCEEDED,
+        NO_ACTIONS);
+  }
+
+  @Test
+  public void testFailedUpdate() {
+    expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
+        .andReturn(ImmutableSet.of(0, 1));
+    String s0 = "0";
+    String s1 = "1";
+    expectEvaluate(
+        0,
+        instance0,
+        s0,
+        FAILED);
+    expectEvaluate(
+        1,
+        instance1,
+        s1,
+        KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+
+    control.replay();
+
+    jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
+
+    evaluate(
+        OneWayStatus.FAILED,
+        ImmutableMap.of(
+            1, InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE));
+
+    // The updater should now reject further attempts to evaluate.
+    try {
+      jobUpdater.evaluate(ImmutableSet.<Integer>of(), stateProvider);
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected.
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadInput() {
+    control.replay();
+
+    new OneWayJobUpdater<>(strategy, 0, ImmutableMap.<Integer, StateEvaluator<String>>of());
+  }
+
+  @Test
+  public void testEvaluateCompletedInstance() {
+    expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
+        .andReturn(ImmutableSet.of(0));
+    expect(strategy.getNextGroup(ImmutableSet.of(1, 2, 3), EMPTY))
+        .andReturn(ImmutableSet.<Integer>of());
+    String s0 = "0";
+    expectEvaluate(
+        0,
+        instance0,
+        s0,
+        SUCCEEDED);
+
+    control.replay();
+
+    jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
+
+    evaluate(
+        OneWayStatus.WORKING,
+        NO_ACTIONS);
+
+    // Instance 0 is already considered finished, so any further notifications of its state will
+    // no-op.
+    evaluate(
+        0,
+        OneWayStatus.WORKING,
+        NO_ACTIONS);
+  }
+}