You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/08/11 23:32:40 UTC
git commit: Add a one-way job update controller.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 842f478bb -> 8d985429d
Add a one-way job update controller.
Bugs closed: AURORA-613
Reviewed at https://reviews.apache.org/r/24465/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/8d985429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/8d985429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/8d985429
Branch: refs/heads/master
Commit: 8d985429d8752bfa130cfb8606101614033818a2
Parents: 842f478
Author: Bill Farner <wf...@apache.org>
Authored: Mon Aug 11 14:30:12 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Aug 11 14:30:12 2014 -0700
----------------------------------------------------------------------
.../updater/InstanceStateProvider.java | 31 ++
.../scheduler/updater/InstanceUpdater.java | 54 +---
.../scheduler/updater/OneWayJobUpdater.java | 320 +++++++++++++++++++
.../scheduler/updater/StateEvaluator.java | 54 ++++
.../scheduler/updater/InstanceUpdaterTest.java | 12 +-
.../scheduler/updater/OneWayJobUpdaterTest.java | 232 ++++++++++++++
6 files changed, 657 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/main/java/org/apache/aurora/scheduler/updater/InstanceStateProvider.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceStateProvider.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceStateProvider.java
new file mode 100644
index 0000000..8b80e53
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceStateProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+/**
+ * Provider that fetches the state associated with an instance ID.
+ *
+ * @param <K> The identifier type for instances.
+ * @param <T> Instance data type.
+ */
+interface InstanceStateProvider<K, T> {
+
+ /**
+ * Fetches the latest state for an instance.
+ *
+ * @param instanceId Instance identifier.
+ * @return Data associated with {@code instanceId}.
+ */
+ T getState(K instanceId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
index 7476d82..85196af 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -33,12 +33,12 @@ import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
import static java.util.Objects.requireNonNull;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_AFTER_RUNNING_LIMIT;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.FAILED;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEEDED;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.FAILED;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
/**
* In part of a job update, this manages the update of an individual instance. This includes
@@ -47,7 +47,7 @@ import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEED
*
* TODO(wfarner): This probably needs to be parameterized so that it may be reused for rollbacks.
*/
-class InstanceUpdater {
+class InstanceUpdater implements StateEvaluator<Optional<IScheduledTask>> {
private static final Logger LOG = Logger.getLogger(InstanceUpdater.class.getName());
private final Optional<ITaskConfig> desiredState;
@@ -104,25 +104,8 @@ class InstanceUpdater {
return Tasks.isActive(status) && status != ScheduleStatus.KILLING;
}
- /**
- * Evaluates the state differences between the originally-provided {@code desiredState} and the
- * provided {@code actualState}.
- * <p>
- * This function should be idempotent, with the exception of an internal failure counter that
- * increments when an updating task exits, or an active but not
- * {@link ScheduleStatus#RUNNING RUNNING} task takes too long to start.
- *
- * <p>
- * It is the responsibility of the caller to ensure that the {@code actualState} is the latest
- * value. Note: the caller should avoid calling this when a terminal task is moving to another
- * terminal state. It should also suppress deletion events for tasks that have been replaced by
- * an active task.
- *
- * @param actualState The actual observed state of the task.
- * @return the evaluation result, including the state of the instance update, and a necessary
- * action to perform.
- */
- synchronized Result evaluate(Optional<IScheduledTask> actualState) {
+ @Override
+ public synchronized StateEvaluator.Result evaluate(Optional<IScheduledTask> actualState) {
boolean desiredPresent = desiredState.isPresent();
boolean actualPresent = actualState.isPresent();
@@ -143,13 +126,13 @@ class InstanceUpdater {
}
}
- private Result addFailureAndCheckIfFailed() {
+ private StateEvaluator.Result addFailureAndCheckIfFailed() {
LOG.info("Observed updated task failure.");
observedFailures++;
return observedFailures > toleratedFailures ? FAILED : EVALUATE_ON_STATE_CHANGE;
}
- private Result handleActualAndDesiredPresent(IScheduledTask actualState) {
+ private StateEvaluator.Result handleActualAndDesiredPresent(IScheduledTask actualState) {
Preconditions.checkState(desiredState.isPresent());
Preconditions.checkArgument(!actualState.getTaskEvents().isEmpty());
@@ -171,7 +154,7 @@ class InstanceUpdater {
} else if (appearsStuck(actualState)) {
// The task is not running, but not terminated, and appears to have been in this state
// long enough that we should intervene.
- Result updaterStatus = addFailureAndCheckIfFailed();
+ StateEvaluator.Result updaterStatus = addFailureAndCheckIfFailed();
return (updaterStatus == FAILED)
? updaterStatus
: KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
@@ -183,22 +166,13 @@ class InstanceUpdater {
// This is not the configuration that we would like to run.
if (isKillable(status)) {
// Task is active, kill it.
- return Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+ return StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
} else if (Tasks.isTerminated(status) && permanentlyKilled(actualState)) {
// The old task has exited, it is now safe to add the new one.
- return Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+ return StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
}
}
return EVALUATE_ON_STATE_CHANGE;
}
-
- enum Result {
- EVALUATE_ON_STATE_CHANGE,
- REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
- KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
- EVALUATE_AFTER_RUNNING_LIMIT,
- SUCCEEDED,
- FAILED
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
new file mode 100644
index 0000000..aa8b5f2
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.twitter.common.util.StateMachine;
+
+import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
+
+/**
+ * Controller for a one-way job update (i.e. no rollbacks). The controller will coordinate updates
+ * of all instances within the job, and roll up the results of the individual updates into the
+ * result of the job update.
+ *
+ * @param <K> Type used to uniquely identify instances.
+ * @param <T> Instance data type.
+ */
+class OneWayJobUpdater<K, T> {
+ private final UpdateStrategy<K> strategy;
+ private final int maxFailedInstances;
+ private final Map<K, InstanceUpdate<T>> instances;
+ private final StateMachine<OneWayStatus> stateMachine =
+ StateMachine.<OneWayStatus>builder("job_update")
+ .initialState(OneWayStatus.IDLE)
+ .addState(OneWayStatus.IDLE, OneWayStatus.WORKING)
+ .addState(OneWayStatus.WORKING, OneWayStatus.SUCCEEDED, OneWayStatus.FAILED)
+ .addState(OneWayStatus.SUCCEEDED)
+ .addState(OneWayStatus.FAILED)
+ .throwOnBadTransition(true)
+ .build();
+
+ /**
+ * Creates a new one-way updater.
+ *
+ * @param strategy The strategy to decide which instances to update after a state change.
+ * @param maxFailedInstances Maximum tolerated failures before the update is considered failed.
+ * @param instanceEvaluators Evaluate the state of individual instances, and decide what actions
+ * must be taken to update them.
+ */
+ OneWayJobUpdater(
+ UpdateStrategy<K> strategy,
+ int maxFailedInstances,
+ Map<K, StateEvaluator<T>> instanceEvaluators) {
+
+ this.strategy = requireNonNull(strategy);
+ this.maxFailedInstances = maxFailedInstances;
+ checkArgument(!instanceEvaluators.isEmpty());
+
+ this.instances = ImmutableMap.copyOf(Maps.transformValues(
+ instanceEvaluators,
+ new Function<StateEvaluator<T>, InstanceUpdate<T>>() {
+ @Override
+ public InstanceUpdate<T> apply(StateEvaluator<T> evaluator) {
+ return new InstanceUpdate<>(evaluator);
+ }
+ }));
+ }
+
+ private static final Function<InstanceUpdate<?>, InstanceUpdateStatus> GET_STATE =
+ new Function<InstanceUpdate<?>, InstanceUpdateStatus>() {
+ @Override
+ public InstanceUpdateStatus apply(InstanceUpdate<?> manager) {
+ return manager.getState();
+ }
+ };
+
+ private static <K, T> Map<K, InstanceUpdate<T>> filterByStatus(
+ Map<K, InstanceUpdate<T>> instances,
+ InstanceUpdateStatus status) {
+
+ return ImmutableMap.copyOf(
+ Maps.filterValues(instances, Predicates.compose(Predicates.equalTo(status), GET_STATE)));
+ }
+
+ private static Optional<InstanceAction> resultToAction(Result result) {
+ switch (result) {
+ case EVALUATE_ON_STATE_CHANGE:
+ return Optional.of(InstanceAction.EVALUATE_ON_STATE_CHANGE);
+ case REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE:
+ return Optional.of(InstanceAction.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ case KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE:
+ return Optional.of(InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ case EVALUATE_AFTER_RUNNING_LIMIT:
+ return Optional.of(InstanceAction.EVALUATE_AFTER_RUNNING_LIMIT);
+ default:
+ break;
+ }
+
+ return Optional.absent();
+ }
+
+ /**
+ * Performs an evaluation of the job. An evaluation would normally be triggered to initiate the
+ * update, as a result of a state change relevant to the update, or due to a
+ * {@link InstanceAction#EVALUATE_AFTER_RUNNING_LIMIT requested} instance re-evaluation.
+ *
+ * @param instancesNeedingUpdate Instances triggering the event, if any.
+ * @param stateProvider Provider to fetch state of instances, and pass to
+ * {@link StateEvaluator#evaluate(Object)}.
+ * @return The outcome of the evaluation, including the state of the job update and actions the
+ * caller should perform on individual instances.
+ * @throws IllegalStateException if the job updater is not currently
+ * {@link OneWayStatus#WORKING working} state, as indicated by a previous evaluation.
+ */
+ synchronized EvaluationResult<K> evaluate(
+ Set<K> instancesNeedingUpdate,
+ InstanceStateProvider<K, T> stateProvider) {
+
+ if (stateMachine.getState() == OneWayStatus.IDLE) {
+ stateMachine.transition(OneWayStatus.WORKING);
+ }
+ Preconditions.checkState(
+ stateMachine.getState() == OneWayStatus.WORKING,
+ "Attempted to evaluate an inactive job updater.");
+
+ // Call order is important here: update on-demand instances, evaluate new instances, compute
+ // job update state.
+ Map<K, InstanceAction> actions = ImmutableMap.<K, InstanceAction>builder()
+ // Re-evaluate instances that are in need of update.
+ .putAll(evaluateInstances(instancesNeedingUpdate, stateProvider))
+ // If ready to begin updating more instances, evaluate those as well.
+ .putAll(startNextInstanceGroup(stateProvider))
+ .build();
+
+ return new EvaluationResult<K>(computeJobUpdateStatus(), actions);
+ }
+
+ private Map<K, InstanceAction> evaluateInstances(
+ Set<K> instanceIds,
+ InstanceStateProvider<K, T> stateProvider) {
+
+ ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.builder();
+ for (K instanceId : instanceIds) {
+ InstanceUpdate<T> update = instances.get(instanceId);
+ // Suppress state changes for updates that are not in-progress.
+ if (update.getState() == InstanceUpdateStatus.WORKING) {
+ Optional<InstanceAction> action =
+ resultToAction(update.evaluate(stateProvider.getState(instanceId)));
+ if (action.isPresent()) {
+ actions.put(instanceId, action.get());
+ }
+ }
+ }
+
+ return actions.build();
+ }
+
+ private Map<K, InstanceAction> startNextInstanceGroup(InstanceStateProvider<K, T> stateProvider) {
+ ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.builder();
+
+ Map<K, InstanceUpdate<T>> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
+ if (!idle.isEmpty()) {
+ Map<K, InstanceUpdate<T>> working =
+ filterByStatus(instances, InstanceUpdateStatus.WORKING);
+ for (K instance : strategy.getNextGroup(idle.keySet(), working.keySet())) {
+ Result result = instances.get(instance).evaluate(stateProvider.getState(instance));
+ Optional<InstanceAction> action = resultToAction(result);
+ if (action.isPresent()) {
+ actions.put(instance, action.get());
+ }
+ }
+ }
+
+ return actions.build();
+ }
+
+ private OneWayStatus computeJobUpdateStatus() {
+ Map<K, InstanceUpdate<T>> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
+ Map<K, InstanceUpdate<T>> working =
+ filterByStatus(instances, InstanceUpdateStatus.WORKING);
+ Map<K, InstanceUpdate<T>> failed = filterByStatus(instances, InstanceUpdateStatus.FAILED);
+ // TODO(wfarner): This needs to be updated to support rollback.
+ if (failed.size() > maxFailedInstances) {
+ stateMachine.transition(OneWayStatus.FAILED);
+ } else if (working.isEmpty() && idle.isEmpty()) {
+ stateMachine.transition(OneWayStatus.SUCCEEDED);
+ }
+
+ return stateMachine.getState();
+ }
+
+ /**
+ * Container and state for the update of an individual instance.
+ */
+ private static class InstanceUpdate<T> {
+ private final StateEvaluator<T> evaluator;
+ private final StateMachine<InstanceUpdateStatus> stateMachine =
+ StateMachine.<InstanceUpdateStatus>builder("instance_update")
+ .initialState(InstanceUpdateStatus.IDLE)
+ .addState(InstanceUpdateStatus.IDLE, InstanceUpdateStatus.WORKING)
+ .addState(
+ InstanceUpdateStatus.WORKING,
+ InstanceUpdateStatus.SUCCEEDED,
+ InstanceUpdateStatus.FAILED)
+ .addState(InstanceUpdateStatus.SUCCEEDED)
+ .addState(InstanceUpdateStatus.FAILED)
+ .throwOnBadTransition(true)
+ .build();
+
+ InstanceUpdate(StateEvaluator<T> evaluator) {
+ this.evaluator = requireNonNull(evaluator);
+ }
+
+ InstanceUpdateStatus getState() {
+ return stateMachine.getState();
+ }
+
+ Result evaluate(T actualState) {
+ if (stateMachine.getState() == InstanceUpdateStatus.IDLE) {
+ stateMachine.transition(InstanceUpdateStatus.WORKING);
+ }
+
+ Result result = evaluator.evaluate(actualState);
+ if (result == Result.SUCCEEDED) {
+ stateMachine.transition(InstanceUpdateStatus.SUCCEEDED);
+ } else if (result == Result.FAILED) {
+ stateMachine.transition(InstanceUpdateStatus.FAILED);
+ }
+ return result;
+ }
+ }
+
+ private enum InstanceUpdateStatus {
+ IDLE,
+ WORKING,
+ SUCCEEDED,
+ FAILED
+ }
+
+ /**
+ * Status of the job update.
+ */
+ enum OneWayStatus {
+ IDLE,
+ WORKING,
+ SUCCEEDED,
+ FAILED
+ }
+
+ /**
+ * Action that should be performed by the caller to converge towards the desired update state.
+ */
+ enum InstanceAction {
+ EVALUATE_ON_STATE_CHANGE,
+ REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+ KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+ EVALUATE_AFTER_RUNNING_LIMIT
+ }
+
+ /**
+ * Result of an evaluation round.
+ */
+ static class EvaluationResult<K> {
+ private final OneWayStatus jobStatus;
+ private final Map<K, InstanceAction> instanceActions;
+
+ EvaluationResult(OneWayStatus jobStatus, Map<K, InstanceAction> instanceActions) {
+ this.jobStatus = requireNonNull(jobStatus);
+ this.instanceActions = requireNonNull(instanceActions);
+ }
+
+ public OneWayStatus getJobStatus() {
+ return jobStatus;
+ }
+
+ public Map<K, InstanceAction> getInstanceActions() {
+ return instanceActions;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof EvaluationResult)) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ EvaluationResult<K> other = (EvaluationResult<K>) obj;
+ return other.getJobStatus().equals(this.getJobStatus())
+ && other.getInstanceActions().equals(this.getInstanceActions());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getJobStatus(), getInstanceActions());
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.Objects.toStringHelper(this)
+ .add("jobStatus", getJobStatus())
+ .add("instanceActions", getInstanceActions())
+ .toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java b/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
new file mode 100644
index 0000000..dca55ef
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+/**
+ * Determines actions that must be taken to change the configuration of a running task.
+ * <p>
+ * A state evaluator is expected to be used multiple times over the course of changing an active
+ * task's configuration. This should be invoked every time the state of an instance changes, to
+ * determine what action to take next. It's expected that it will eventually converge by
+ * {@link Result#SUCCEEDED succeeding} or {@link Result#FAILED failing}.
+ *
+ * @param <T> Instance state type.
+ */
+interface StateEvaluator<T> {
+
+ /**
+ * Evaluates the state differences between the desired state and the provided {@code actualState}.
+ * <p>
+ * This function should be idempotent, with the exception of an internal failure counter that
+ * increments when an updating task exits, or an active but not
+ * {@link org.apache.aurora.gen.ScheduleStatus#RUNNING RUNNING} task takes too long to start.
+ * <p>
+ * It is the responsibility of the caller to ensure that the {@code actualState} is the latest
+ * value. Note: the caller should avoid calling this when a terminal task is moving to another
+ * terminal state. It should also suppress deletion events for tasks that have been replaced by
+ * an active task.
+ *
+ * @param actualState The actual observed state of the task.
+ * @return the evaluation result, including the state of the instance update, and a necessary
+ * action to perform.
+ */
+ Result evaluate(T actualState);
+
+ enum Result {
+ EVALUATE_ON_STATE_CHANGE,
+ REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+ KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+ EVALUATE_AFTER_RUNNING_LIMIT,
+ SUCCEEDED,
+ FAILED
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
index dda1b73..d7baaa6 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
@@ -34,12 +34,12 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_AFTER_RUNNING_LIMIT;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.InstanceUpdater.Result.SUCCEEDED;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
import static org.junit.Assert.assertEquals;
public class InstanceUpdaterTest {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d985429/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
new file mode 100644
index 0000000..e3e50d7
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.EvaluationResult;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.InstanceAction;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_RUNNING_LIMIT;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.FAILED;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OneWayJobUpdaterTest extends EasyMockTest {
+ private static final Set<Integer> EMPTY = ImmutableSet.of();
+ private static final Map<Integer, InstanceAction> NO_ACTIONS = ImmutableMap.of();
+
+ private UpdateStrategy<Integer> strategy;
+ private StateEvaluator<String> instance0;
+ private StateEvaluator<String> instance1;
+ private StateEvaluator<String> instance2;
+ private StateEvaluator<String> instance3;
+ private Map<Integer, StateEvaluator<String>> allInstances;
+ private InstanceStateProvider<Integer, String> stateProvider;
+
+ private OneWayJobUpdater<Integer, String> jobUpdater;
+
+ @Before
+ public void setUp() {
+ strategy = createMock(new Clazz<UpdateStrategy<Integer>>() { });
+ instance0 = createMock(new Clazz<StateEvaluator<String>>() { });
+ instance1 = createMock(new Clazz<StateEvaluator<String>>() { });
+ instance2 = createMock(new Clazz<StateEvaluator<String>>() { });
+ instance3 = createMock(new Clazz<StateEvaluator<String>>() { });
+ allInstances = ImmutableMap.of(
+ 0, instance0,
+ 1, instance1,
+ 2, instance2,
+ 3, instance3);
+ stateProvider = createMock(new Clazz<InstanceStateProvider<Integer, String>>() { });
+ }
+
+ private void evaluate(OneWayStatus expectedStatus, Map<Integer, InstanceAction> expectedActions) {
+ assertEquals(
+ new EvaluationResult<>(expectedStatus, expectedActions),
+ jobUpdater.evaluate(ImmutableSet.<Integer>of(), stateProvider));
+ }
+
+ private void evaluate(
+ int instanceId,
+ OneWayStatus expectedStatus,
+ Map<Integer, InstanceAction> expectedActions) {
+
+ assertEquals(
+ new EvaluationResult<>(expectedStatus, expectedActions),
+ jobUpdater.evaluate(ImmutableSet.of(instanceId), stateProvider));
+ }
+
+ private void expectEvaluate(
+ int instanceId,
+ StateEvaluator<String> instanceMock,
+ String state,
+ Result result) {
+
+ expect(stateProvider.getState(instanceId)).andReturn(state);
+ expect(instanceMock.evaluate(state)).andReturn(result);
+ }
+
+ @Test
+ public void testSuccessfulUpdate() {
+ expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
+ .andReturn(ImmutableSet.of(0, 2));
+ String s0 = "0";
+ String s1 = "1";
+ String s2 = "2";
+ String s3 = "3";
+ expectEvaluate(
+ 0,
+ instance0,
+ s0,
+ KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+ expectEvaluate(
+ 2,
+ instance2,
+ s2,
+ REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+
+ expectEvaluate(0, instance0, s0, EVALUATE_ON_STATE_CHANGE);
+ expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(0, 2))).andReturn(EMPTY);
+ expectEvaluate(0, instance0, s0, SUCCEEDED);
+ expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(2))).andReturn(EMPTY);
+ expectEvaluate(2, instance2, s2, SUCCEEDED);
+ expect(strategy.getNextGroup(ImmutableSet.of(1, 3), EMPTY))
+ .andReturn(ImmutableSet.of(1, 3));
+ expectEvaluate(
+ 1,
+ instance1,
+ s1,
+ SUCCEEDED);
+ expectEvaluate(
+ 3,
+ instance3,
+ s3,
+ EVALUATE_AFTER_RUNNING_LIMIT);
+ expectEvaluate(3, instance3, s3, SUCCEEDED);
+
+ control.replay();
+
+ jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
+
+ evaluate(
+ OneWayStatus.WORKING,
+ ImmutableMap.of(
+ 0, InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
+ 2, InstanceAction.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE));
+ evaluate(
+ 0,
+ OneWayStatus.WORKING,
+ ImmutableMap.of(0, InstanceAction.EVALUATE_ON_STATE_CHANGE));
+ evaluate(
+ 0,
+ OneWayStatus.WORKING,
+ NO_ACTIONS);
+ evaluate(
+ 2,
+ OneWayStatus.WORKING,
+ ImmutableMap.of(
+ 3, InstanceAction.EVALUATE_AFTER_RUNNING_LIMIT));
+ evaluate(
+ 3,
+ OneWayStatus.SUCCEEDED,
+ NO_ACTIONS);
+ }
+
+ @Test
+ public void testFailedUpdate() {
+ expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
+ .andReturn(ImmutableSet.of(0, 1));
+ String s0 = "0";
+ String s1 = "1";
+ expectEvaluate(
+ 0,
+ instance0,
+ s0,
+ FAILED);
+ expectEvaluate(
+ 1,
+ instance1,
+ s1,
+ KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+
+ control.replay();
+
+ jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
+
+ evaluate(
+ OneWayStatus.FAILED,
+ ImmutableMap.of(
+ 1, InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE));
+
+ // The updater should now reject further attempts to evaluate.
+ try {
+ jobUpdater.evaluate(ImmutableSet.<Integer>of(), stateProvider);
+ fail();
+ } catch (IllegalStateException e) {
+ // Expected.
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadInput() {
+ control.replay();
+
+ new OneWayJobUpdater<>(strategy, 0, ImmutableMap.<Integer, StateEvaluator<String>>of());
+ }
+
+ @Test
+ public void testEvaluateCompletedInstance() {
+ expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
+ .andReturn(ImmutableSet.of(0));
+ expect(strategy.getNextGroup(ImmutableSet.of(1, 2, 3), EMPTY))
+ .andReturn(ImmutableSet.<Integer>of());
+ String s0 = "0";
+ expectEvaluate(
+ 0,
+ instance0,
+ s0,
+ SUCCEEDED);
+
+ control.replay();
+
+ jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
+
+ evaluate(
+ OneWayStatus.WORKING,
+ NO_ACTIONS);
+
+ // Instance 0 is already considered finished, so any further notifications of its state will
+ // no-op.
+ evaluate(
+ 0,
+ OneWayStatus.WORKING,
+ NO_ACTIONS);
+ }
+}