You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/13 21:33:43 UTC
[3/3] git commit: Refactor StateManagerImpl and TaskStateMachine for
less code and better readability.
Refactor StateManagerImpl and TaskStateMachine for less code and better readability.
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/70adc19a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/70adc19a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/70adc19a
Branch: refs/heads/wfarner/state_machine_refactor
Commit: 70adc19ad99be08de93fd2b501b7f8bab83b0953
Parents: 801dcfb
Author: Bill Farner <bi...@twitter.com>
Authored: Thu Jan 9 16:07:56 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Jan 13 12:26:17 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/state/SideEffect.java | 94 +++
.../scheduler/state/SideEffectStorage.java | 169 ------
.../scheduler/state/StateManagerImpl.java | 500 +++++++--------
.../scheduler/state/TaskStateMachine.java | 546 +++++++----------
.../aurora/scheduler/state/WorkCommand.java | 33 -
.../aurora/scheduler/storage/Storage.java | 4 +
.../aurora/scheduler/storage/TaskStore.java | 2 +
.../scheduler/state/StateManagerImplTest.java | 95 ++-
.../scheduler/state/TaskStateMachineTest.java | 601 ++++++++++++++-----
9 files changed, 1116 insertions(+), 928 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
new file mode 100644
index 0000000..5759691
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+/**
+ * Descriptions of the different types of external work commands that task state machines may
+ * trigger.
+ */
+class SideEffect {
+ private final Action action;
+ private final Optional<ScheduleStatus> nextState;
+
+ SideEffect(Action action, Optional<ScheduleStatus> nextState) {
+ this.action = action;
+ if (action == Action.STATE_CHANGE) {
+ Preconditions.checkArgument(
+ nextState.isPresent(),
+ "A next state must be provided for a state change action.");
+ }
+ this.nextState = nextState;
+ }
+
+ public Action getAction() {
+ return action;
+ }
+
+ public Optional<ScheduleStatus> getNextState() {
+ return nextState;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SideEffect)) {
+ return false;
+ }
+
+ SideEffect other = (SideEffect) o;
+ return Objects.equal(action, other.action)
+ && Objects.equal(nextState, other.nextState);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(action, nextState);
+ }
+
+ @Override
+ public String toString() {
+ if (nextState.isPresent()) {
+ return action.toString() + " " + nextState.get();
+ } else {
+ return action.toString();
+ }
+ }
+
+ enum Action {
+ // Send an instruction for the runner of this task to kill the task.
+ KILL,
+
+ // Create a new state machine with a copy of this task.
+ RESCHEDULE,
+
+ // Update the task's state (schedule status) in the persistent store to match the state machine.
+ SAVE_STATE,
+
+ // Delete this task from the persistent store.
+ DELETE,
+
+ // Increment the failure count for this task.
+ INCREMENT_FAILURES,
+
+ // Perform an additional state change on the task.
+ STATE_CHANGE
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
deleted file mode 100644
index 2bdd459..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around the persistent storage and mutable state.
- */
-class SideEffectStorage {
-
- private final Queue<PubsubEvent> events = Lists.newLinkedList();
- @VisibleForTesting
- Queue<PubsubEvent> getEvents() {
- return events;
- }
-
- private AtomicBoolean inOperation = new AtomicBoolean(false);
-
- private final Storage storage;
- private final OperationFinalizer operationFinalizer;
- private final EventSink eventSink;
-
- interface OperationFinalizer {
- /**
- * Performs any work necessary to complete the operation.
- * This is executed in the context of a write operation, immediately after the work
- * executes normally.
- * NOTE: At present, this is executed for every nesting level of operations, rather than
- * at the completion of the top-level operation.
- * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
- * for more detail.
- *
- * @param work Work to finalize.
- * @param storeProvider Mutable store reference.
- */
- void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
- }
-
- SideEffectStorage(
- Storage storage,
- OperationFinalizer operationFinalizer,
- EventSink eventSink) {
-
- this.storage = checkNotNull(storage);
- this.operationFinalizer = checkNotNull(operationFinalizer);
- this.eventSink = checkNotNull(eventSink);
- }
-
- /**
- * Perform a unit of work in a mutating operation. This supports nesting/reentrancy.
- *
- * @param work Work to perform.
- * @param <T> Work return type
- * @param <E> Work exception type.
- * @return The work return value.
- * @throws E The work exception.
- */
- <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
- return storage.write(executeSideEffectsAfter(work));
- }
-
- <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- return storage.consistentRead(work);
- }
-
- /**
- * Work that has side effects external to the storage system.
- * Work may add side effect and pubsub events, which will be executed/sent upon normal
- * completion of the operation.
- *
- * @param <T> Work return type.
- * @param <E> Work exception type.
- */
- abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
- protected final void addTaskEvent(PubsubEvent notice) {
- Preconditions.checkState(inOperation.get());
- events.add(Preconditions.checkNotNull(notice));
- }
- }
-
- /**
- * Work with side effects which does not throw checked exceptions.
- *
- * @param <T> Work return type.
- */
- abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
- }
-
- /**
- * Work with side effects that does not have a return value.
- *
- * @param <E> Work exception type.
- */
- abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
-
- @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
- execute(storeProvider);
- return null;
- }
-
- abstract void execute(MutableStoreProvider storeProvider) throws E;
- }
-
- /**
- * Work with side effects which does not throw checked exceptions or have a return
- * value.
- */
- abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
- }
-
- private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
- final SideEffectWork<T, E> work) {
-
- return new MutateWork<T, E>() {
- @Override public T apply(MutableStoreProvider storeProvider) throws E {
- boolean topLevelOperation = inOperation.compareAndSet(false, true);
-
- try {
- T result = work.apply(storeProvider);
-
- // TODO(William Farner): Maintaining this since it matches prior behavior, but this
- // seems wrong. Double-check whether this is necessary, or if only the top-level
- // operation should be executing the finalizer. Update doc on OperationFinalizer
- // once this is assessed.
- operationFinalizer.finalize(work, storeProvider);
- if (topLevelOperation) {
- while (!events.isEmpty()) {
- eventSink.post(events.remove());
- }
- }
- return result;
- } finally {
- if (topLevelOperation) {
- inOperation.set(false);
- }
- }
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6078eee..819d921 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -15,42 +15,48 @@
*/
package org.apache.aurora.scheduler.state;
-import java.util.Comparator;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.annotation.Nullable;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.stats.Stats;
import com.twitter.common.util.Clock;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
+import org.apache.aurora.scheduler.state.TaskStateMachine.TransitionResult;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -62,11 +68,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.transform;
import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
-
/**
* Manager of all persistence-related operations for the scheduler. Acts as a controller for
@@ -78,38 +82,11 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
public class StateManagerImpl implements StateManager {
private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
- private final SideEffectStorage storage;
- @VisibleForTesting
- SideEffectStorage getStorage() {
- return storage;
- }
-
+ private final Storage storage;
+ private final Clock clock;
+ private final Driver driver;
private final TaskIdGenerator taskIdGenerator;
-
- // Work queue to receive state machine side effect work.
- // Items are sorted to place DELETE entries last. This is to ensure that within an operation,
- // a delete is always processed after a state transition.
- private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
- new Comparator<WorkEntry>() {
- @Override public int compare(WorkEntry a, WorkEntry b) {
- if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
- return (a.command == WorkCommand.DELETE) ? 1 : -1;
- } else {
- return 0;
- }
- }
- });
-
- // Adapt the work queue into a sink.
- private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
- @Override public void addWork(
- WorkCommand work,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- workQueue.add(new WorkEntry(work, stateMachine, mutation));
- }
- };
+ private final EventSink eventSink;
private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
@@ -125,28 +102,6 @@ public class StateManagerImpl implements StateManager {
}
};
- private final Driver driver;
- private final Clock clock;
-
- /**
- * An item of work on the work queue.
- */
- private static class WorkEntry {
- private final WorkCommand command;
- private final TaskStateMachine stateMachine;
- private final Function<IScheduledTask, IScheduledTask> mutation;
-
- WorkEntry(
- WorkCommand command,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- this.command = command;
- this.stateMachine = stateMachine;
- this.mutation = mutation;
- }
- }
-
@Inject
StateManagerImpl(
final Storage storage,
@@ -155,20 +110,11 @@ public class StateManagerImpl implements StateManager {
TaskIdGenerator taskIdGenerator,
EventSink eventSink) {
- checkNotNull(storage);
+ this.storage = checkNotNull(storage);
this.clock = checkNotNull(clock);
-
- OperationFinalizer finalizer = new OperationFinalizer() {
- @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
- processWorkQueueInWriteOperation(work, store);
- }
- };
-
- this.storage = new SideEffectStorage(storage, finalizer, eventSink);
this.driver = checkNotNull(driver);
this.taskIdGenerator = checkNotNull(taskIdGenerator);
-
- Stats.exportSize("work_queue_depth", workQueue);
+ this.eventSink = checkNotNull(eventSink);
}
@Override
@@ -179,12 +125,16 @@ public class StateManagerImpl implements StateManager {
final Set<IScheduledTask> scheduledTasks =
ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
- storage.write(storage.new NoResultQuietSideEffectWork() {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override protected void execute(MutableStoreProvider storeProvider) {
storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
for (IScheduledTask task : scheduledTasks) {
- createStateMachine(task).updateState(PENDING);
+ updateTaskAndExternalState(
+ Tasks.id(task),
+ Optional.of(task),
+ PENDING,
+ Optional.<String>absent());
}
}
});
@@ -197,54 +147,55 @@ public class StateManagerImpl implements StateManager {
final ScheduleStatus newState,
final Optional<String> auditMessage) {
- return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
- @Override
- public Boolean apply(TaskStateMachine stateMachine) {
- return stateMachine.updateState(newState, auditMessage);
- }
- });
+ return updateTaskAndExternalState(casState, taskId, newState, auditMessage);
}
@Override
public IAssignedTask assignTask(
- String taskId,
- String slaveHost,
- SlaveID slaveId,
- Set<Integer> assignedPorts) {
+ final String taskId,
+ final String slaveHost,
+ final SlaveID slaveId,
+ final Set<Integer> assignedPorts) {
checkNotBlank(taskId);
checkNotBlank(slaveHost);
+ checkNotNull(slaveId);
checkNotNull(assignedPorts);
- TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
- changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
-
- return mutation.getAssignedTask();
- }
-
- private boolean changeState(
- final String taskId,
- final Optional<ScheduleStatus> casState,
- final Function<TaskStateMachine, Boolean> stateChange) {
-
- return storage.write(storage.new QuietSideEffectWork<Boolean>() {
- @Override public Boolean apply(MutableStoreProvider storeProvider) {
- IScheduledTask task = Iterables.getOnlyElement(
- storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
- null);
- if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
- return false;
- }
+ return storage.write(new MutateWork.Quiet<IAssignedTask>() {
+ @Override public IAssignedTask apply(MutableStoreProvider storeProvider) {
+ boolean success = updateTaskAndExternalState(
+ Optional.<ScheduleStatus>absent(),
+ taskId,
+ ASSIGNED,
+ Optional.<String>absent());
+
+ Preconditions.checkState(
+ success,
+ "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
+ Query.Builder query = Query.taskScoped(taskId);
+ storeProvider.getUnsafeTaskStore().mutateTasks(query,
+ new Function<IScheduledTask, IScheduledTask>() {
+ @Override
+ public IScheduledTask apply(IScheduledTask task) {
+ ScheduledTask builder = task.newBuilder();
+ AssignedTask assigned = builder.getAssignedTask();
+ assigned.setAssignedPorts(
+ getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+ assigned.setSlaveHost(slaveHost)
+ .setSlaveId(slaveId.getValue());
+ return IScheduledTask.build(builder);
+ }
+ });
- return stateChange.apply(getStateMachine(taskId, task));
+ return Iterables.getOnlyElement(
+ Iterables.transform(
+ storeProvider.getTaskStore().fetchTasks(query),
+ Tasks.SCHEDULED_TO_ASSIGNED));
}
});
}
- private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
- IAssignedTask getAssignedTask();
- }
-
private static Map<String, Integer> getNameMappedPorts(
Set<String> portNames,
Set<Integer> allocatedPorts) {
@@ -270,157 +221,218 @@ public class StateManagerImpl implements StateManager {
return ports;
}
- private TaskAssignMutation assignHost(
- final String slaveHost,
- final SlaveID slaveId,
- final Set<Integer> assignedPorts) {
+ @VisibleForTesting
+ static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
+ new Supplier<String>() {
+ @Override public String get() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.log(Level.SEVERE, "Failed to get self hostname.");
+ throw Throwables.propagate(e);
+ }
+ }
+ });
- final TaskMutation mutation = new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- ScheduledTask builder = task.newBuilder();
- AssignedTask assigned = builder.getAssignedTask();
- assigned.setAssignedPorts(
- getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
- assigned.setSlaveHost(slaveHost)
- .setSlaveId(slaveId.getValue());
- return IScheduledTask.build(builder);
- }
- };
+ private boolean updateTaskAndExternalState(
+ final Optional<ScheduleStatus> casState,
+ final String taskId,
+ final ScheduleStatus targetState,
+ final Optional<String> transitionMessage) {
- return new TaskAssignMutation() {
- private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
- @Override public IAssignedTask getAssignedTask() {
- return assignedTask.get();
- }
+ return storage.write(new MutateWork.Quiet<Boolean>() {
+ @Override public Boolean apply(MutableStoreProvider storeProvider) {
+ Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
+ storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+ null));
- @Override public Boolean apply(final TaskStateMachine stateMachine) {
- TaskMutation wrapper = new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- IScheduledTask mutated = mutation.apply(task);
- Preconditions.checkState(
- assignedTask.compareAndSet(null, mutated.getAssignedTask()),
- "More than one result was found for an identity query.");
- return mutated;
- }
- };
- return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
+ // CAS operation fails if the task does not exist, or the states don't match.
+ if (casState.isPresent()
+ && (!task.isPresent() || (casState.get() != task.get().getStatus()))) {
+
+ return false;
+ }
+
+ return updateTaskAndExternalState(taskId, task, targetState, transitionMessage);
}
- };
+ });
}
- private void processWorkQueueInWriteOperation(
- SideEffectWork<?, ?> sideEffectWork,
- MutableStoreProvider storeProvider) {
-
- for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
- final TaskStateMachine stateMachine = work.stateMachine;
-
- if (work.command == WorkCommand.KILL) {
- driver.killTask(stateMachine.getTaskId());
- } else {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- String taskId = stateMachine.getTaskId();
- Query.Builder idQuery = Query.taskScoped(taskId);
-
- switch (work.command) {
- case RESCHEDULE:
- ScheduledTask builder =
- Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
- builder.getAssignedTask().unsetSlaveId();
- builder.getAssignedTask().unsetSlaveHost();
- builder.getAssignedTask().unsetAssignedPorts();
- builder.unsetTaskEvents();
- builder.setAncestorId(taskId);
- String newTaskId = taskIdGenerator.generate(
- ITaskConfig.build(builder.getAssignedTask().getTask()),
- builder.getAssignedTask().getInstanceId());
- builder.getAssignedTask().setTaskId(newTaskId);
-
- LOG.info("Task being rescheduled: " + taskId);
-
- IScheduledTask task = IScheduledTask.build(builder);
- taskStore.saveTasks(ImmutableSet.of(task));
-
- createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
- ITaskConfig taskInfo = task.getAssignedTask().getTask();
- sideEffectWork.addTaskEvent(
- new PubsubEvent.TaskRescheduled(
- taskInfo.getOwner().getRole(),
- taskInfo.getJobName(),
- task.getAssignedTask().getInstanceId()));
- break;
-
- case UPDATE_STATE:
- taskStore.mutateTasks(idQuery, new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- return work.mutation.apply(
- IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
- }
- });
- sideEffectWork.addTaskEvent(
- PubsubEvent.TaskStateChange.transition(
- Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
- stateMachine.getPreviousState()));
- break;
-
- case DELETE:
- deleteTasks(ImmutableSet.of(taskId));
- break;
-
- case INCREMENT_FAILURES:
- taskStore.mutateTasks(idQuery, new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- return IScheduledTask.build(
- task.newBuilder().setFailureCount(task.getFailureCount() + 1));
- }
- });
- break;
+ private static final Function<SideEffect, SideEffect.Action> GET_ACTION =
+ new Function<SideEffect, Action>() {
+ @Override public Action apply(SideEffect sideEffect) {
+ return sideEffect.getAction();
+ }
+ };
+
+ private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
+ Action.INCREMENT_FAILURES,
+ Action.SAVE_STATE,
+ Action.STATE_CHANGE,
+ Action.RESCHEDULE,
+ Action.KILL,
+ Action.DELETE);
+ static {
+ // Sanity check to ensure no actions are missing.
+ Preconditions.checkState(
+ ImmutableSet.copyOf(ACTIONS_IN_ORDER).equals(ImmutableSet.copyOf(Action.values())),
+ "Not all actions are included in ordering.");
+ }
+
+ // Actions are deliberately ordered to prevent things like deleting a task before rescheduling it
+ // (thus losing the object to copy), or rescheduling a task before incrementing the failure count
+ // (thus not carrying forward the failure increment).
+ private static final Ordering<SideEffect> ACTION_ORDER =
+ Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
- default:
- LOG.severe("Unrecognized work command type " + work.command);
+ private boolean updateTaskAndExternalState(
+ final String taskId,
+ // Note: This argument is deliberately non-final, and should not be made final.
+ // This is because using the captured value within the storage operation below is
+ // highly-risky, since it doesn't necessarily represent the value in storage.
+ // As a result, it would be easy to accidentally clobber mutations.
+ Optional<IScheduledTask> task,
+ final ScheduleStatus targetState,
+ final Optional<String> transitionMessage) {
+
+ if (task.isPresent()) {
+ Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId()));
+ }
+
+ final List<PubsubEvent> events = Lists.newArrayList();
+
+ final TaskStateMachine stateMachine = task.isPresent()
+ ? new TaskStateMachine(task.get())
+ : new TaskStateMachine(taskId);
+
+ boolean success = storage.write(new MutateWork.Quiet<Boolean>() {
+ @Override public Boolean apply(MutableStoreProvider storeProvider) {
+ TransitionResult result = stateMachine.updateState(targetState);
+ Query.Builder query = Query.taskScoped(taskId);
+
+ for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
+ Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
+ Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query), null));
+
+ switch (sideEffect.getAction()) {
+ case KILL:
+ driver.killTask(taskId);
+ break;
+
+ case RESCHEDULE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+ LOG.info("Task being rescheduled: " + taskId);
+
+ ScheduledTask builder = upToDateTask.get().newBuilder();
+ builder.setStatus(INIT);
+ builder.getAssignedTask().unsetSlaveId();
+ builder.getAssignedTask().unsetSlaveHost();
+ builder.getAssignedTask().unsetAssignedPorts();
+ builder.unsetTaskEvents();
+ builder.setAncestorId(taskId);
+ String newTaskId = taskIdGenerator.generate(
+ ITaskConfig.build(builder.getAssignedTask().getTask()),
+ builder.getAssignedTask().getInstanceId());
+ builder.getAssignedTask().setTaskId(newTaskId);
+
+ IScheduledTask newTask = IScheduledTask.build(builder);
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(newTask));
+ updateTaskAndExternalState(
+ newTaskId,
+ Optional.of(newTask),
+ PENDING, Optional.of("Rescheduled"));
+
+ ITaskConfig taskInfo = newTask.getAssignedTask().getTask();
+ events.add(
+ new PubsubEvent.TaskRescheduled(
+ taskInfo.getOwner().getRole(),
+ taskInfo.getJobName(),
+ newTask.getAssignedTask().getInstanceId()));
+ break;
+
+ case SAVE_STATE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+
+ storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ ScheduledTask mutableTask = task.newBuilder();
+ mutableTask.setStatus(stateMachine.getState());
+ mutableTask.addToTaskEvents(new TaskEvent()
+ .setTimestamp(clock.nowMillis())
+ .setStatus(targetState)
+ .setMessage(transitionMessage.orNull())
+ .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+ return IScheduledTask.build(mutableTask);
+ }
+ });
+ events.add(
+ PubsubEvent.TaskStateChange.transition(
+ Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query)),
+ stateMachine.getPreviousState()));
+ break;
+
+ case DELETE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+
+ events.add(deleteTasks(storeProvider, ImmutableSet.of(taskId)));
+ break;
+
+ case INCREMENT_FAILURES:
+ storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ return IScheduledTask.build(
+ task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+ }
+ });
+ break;
+
+ case STATE_CHANGE:
+ updateTaskAndExternalState(
+ Optional.<ScheduleStatus>absent(),
+ taskId,
+ sideEffect.getNextState().get(),
+ Optional.<String>absent());
+ break;
+
+ default:
+ throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
+ }
}
+
+ return result.isSuccess();
}
+ });
+
+ // Note: Delaying events until after the write operation is somewhat futile, since the state
+ // may actually not be written to durable store (e.g. if this is a nested transaction).
+ // Ideally, Storage would add a facility to attach side-effects that are performed after the
+ // outer-most transaction completes (meaning state has been durably persisted).
+ for (PubsubEvent event : events) {
+ eventSink.post(event);
}
+
+ return success;
}
@Override
public void deleteTasks(final Set<String> taskIds) {
- storage.write(storage.new NoResultQuietSideEffectWork() {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override protected void execute(final MutableStoreProvider storeProvider) {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
- addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
- taskStore.deleteTasks(taskIds);
+ eventSink.post(deleteTasks(storeProvider, taskIds));
}
});
}
- private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
- if (task != null) {
- return createStateMachine(task, task.getStatus());
- }
-
- // The task is unknown, not present in storage.
- TaskStateMachine stateMachine = new TaskStateMachine(
- taskId,
- null,
- workSink,
- clock,
- INIT);
- stateMachine.updateState(UNKNOWN);
- return stateMachine;
- }
-
- private TaskStateMachine createStateMachine(IScheduledTask task) {
- return createStateMachine(task, INIT);
- }
-
- private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
- return new TaskStateMachine(
- Tasks.id(task),
- task,
- workSink,
- clock,
- initialState);
+ private static PubsubEvent deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds) {
+ TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+ Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
+ taskStore.deleteTasks(taskIds);
+ return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index d0f88e5..cd0899c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -15,39 +15,54 @@
*/
package org.apache.aurora.scheduler.state;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
+import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import com.twitter.common.base.Closure;
import com.twitter.common.base.Closures;
import com.twitter.common.base.Command;
import com.twitter.common.base.MorePreconditions;
import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
import com.twitter.common.util.StateMachine;
import com.twitter.common.util.StateMachine.Rule;
import com.twitter.common.util.StateMachine.Transition;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.commons.lang.builder.HashCodeBuilder;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
+import static org.apache.aurora.scheduler.state.SideEffect.Action;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.DELETE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAILURES;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.STATE_CHANGE;
+
/**
* State machine for a task.
* <p>
@@ -55,8 +70,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
* to different state transitions. These responses are externally communicated by populating a
* provided work queue.
* <p>
- * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
- * abstractly from the consumption side.
+ * TODO(wfarner): Augment this class to force the one-time-use nature. This is probably best done
+ * by hiding the constructor and exposing only a static function to transition a task and get the
+ * resulting actions.
*/
class TaskStateMachine {
private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
@@ -64,183 +80,94 @@ class TaskStateMachine {
private static final AtomicLong ILLEGAL_TRANSITIONS =
Stats.exportLong("scheduler_illegal_task_state_transitions");
- // Re-declarations of statuses as wrapped state objects.
- private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
- private static final State FAILED = State.create(ScheduleStatus.FAILED);
- private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
- private static final State INIT = State.create(ScheduleStatus.INIT);
- private static final State KILLED = State.create(ScheduleStatus.KILLED);
- private static final State KILLING = State.create(ScheduleStatus.KILLING);
- private static final State LOST = State.create(ScheduleStatus.LOST);
- private static final State PENDING = State.create(ScheduleStatus.PENDING);
- private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
- private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
- private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
- private static final State STARTING = State.create(ScheduleStatus.STARTING);
- private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
-
- @VisibleForTesting
- static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
- new Supplier<String>() {
- @Override public String get() {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- LOG.log(Level.SEVERE, "Failed to get self hostname.");
- throw Throwables.propagate(e);
- }
- }
- });
-
- private final String taskId;
- private final WorkSink workSink;
- private final StateMachine<State> stateMachine;
+ private final StateMachine<ScheduleStatus> stateMachine;
private ScheduleStatus previousState = null;
- private final Clock clock;
-
- /**
- * Composes a schedule status and a state change argument. Only the ScheduleStatuses in two
- * States must be equal for them to be considered equal.
- */
- private static class State {
- private final ScheduleStatus state;
- private final Function<IScheduledTask, IScheduledTask> mutation;
-
- State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
- this.state = state;
- this.mutation = mutation;
- }
-
- static State create(ScheduleStatus status) {
- return create(status, Functions.<IScheduledTask>identity());
- }
-
- static State create(
- ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- return new State(status, mutation);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof State)) {
- return false;
- }
-
- if (o == this) {
- return true;
- }
-
- State other = (State) o;
- return state == other.state;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(state)
- .toHashCode();
- }
- @Override
- public String toString() {
- return state.toString();
- }
-
- private ScheduleStatus getState() {
- return state;
- }
-
- private Function<IScheduledTask, IScheduledTask> getMutation() {
- return mutation;
- }
- }
+ private final Set<SideEffect> sideEffects = Sets.newHashSet();
/**
- * A write-only work acceptor.
+ * Creates a new task state machine representing a non-existent task. This allows for consistent
+ * state-reconciliation actions when the external system disagrees with the scheduler.
+ *
+ * @param name Name of the state machine, for logging.
*/
- public interface WorkSink {
- /**
- * Appends external work that must be performed for a state machine transition to be fully
- * complete.
- *
- * @param work Description of the work to be performed.
- * @param stateMachine The state machine that the work is associated with.
- * @param mutation Mutate operation to perform along with the state transition.
- */
- void addWork(
- WorkCommand work,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation);
+ public TaskStateMachine(String name) {
+ this(name, Optional.<IScheduledTask>absent());
}
/**
- * Creates a new task state machine.
- *
- * @param taskId ID of the task managed by this state machine.
+ * Creates a new task state machine representing an existent task. The state machine will be
+ * named with the tasks ID.
+ *.
* @param task Read-only task that this state machine manages.
- * @param workSink Work sink to receive transition response actions
- * @param clock Clock to use for reading the current time.
- * @param initialState The state to begin the state machine at. All legal transitions will be
- * added, but this allows the state machine to 'skip' states, for instance when a task is
- * loaded from a persistent store.
*/
- public TaskStateMachine(
- final String taskId,
- final IScheduledTask task,
- final WorkSink workSink,
- final Clock clock,
- final ScheduleStatus initialState) {
-
- this.taskId = MorePreconditions.checkNotBlank(taskId);
- this.workSink = checkNotNull(workSink);
- this.clock = checkNotNull(clock);
- checkNotNull(initialState);
-
- @SuppressWarnings("unchecked")
- Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
- /* Kill a task that we believe to be terminated when an attempt is made to revive. */
- Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
- addWorkClosure(WorkCommand.KILL)),
- /* Remove a terminated task that is remotely removed. */
- Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
-
- final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case ASSIGNED:
- case STARTING:
- case RUNNING:
- addWork(WorkCommand.KILL);
- break;
-
- case LOST:
- addWork(WorkCommand.KILL);
- // fall through
-
- case FINISHED:
- case FAILED:
- case KILLED:
- addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
- break;
-
- case UNKNOWN:
- updateState(ScheduleStatus.LOST);
- break;
-
- default:
- // No-op.
- }
- }
- };
+ public TaskStateMachine(IScheduledTask task) {
+ this(Tasks.id(task), Optional.of(task));
+ }
+
+ private TaskStateMachine(final String name, final Optional<IScheduledTask> task) {
+ MorePreconditions.checkNotBlank(name);
+ checkNotNull(task);
+
+ final ScheduleStatus initialState = task.transform(Tasks.GET_STATUS).or(UNKNOWN);
+ Preconditions.checkState((initialState == UNKNOWN) == !task.isPresent());
+
+ Closure<Transition<ScheduleStatus>> manageTerminatedTasks = Closures.combine(
+ ImmutableList.<Closure<Transition<ScheduleStatus>>>builder()
+ // Kill a task that we believe to be terminated when an attempt is made to revive.
+ .add(Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING), addWorkClosure(KILL)))
+ // Remove a terminated task that is remotely removed.
+ .add(Closures.filter(Transition.to(UNKNOWN), addWorkClosure(DELETE)))
+ .build());
+
+ final Closure<Transition<ScheduleStatus>> manageRestartingTask =
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
+ case ASSIGNED:
+ addFollowup(KILL);
+ break;
+
+ case STARTING:
+ addFollowup(KILL);
+ break;
+
+ case RUNNING:
+ addFollowup(KILL);
+ break;
+
+ case LOST:
+ addFollowup(KILL);
+ addFollowup(RESCHEDULE);
+ break;
+
+ case FINISHED:
+ addFollowup(RESCHEDULE);
+ break;
+
+ case FAILED:
+ addFollowup(RESCHEDULE);
+ break;
+
+ case KILLED:
+ addFollowup(RESCHEDULE);
+ break;
+
+ case UNKNOWN:
+ addFollowupTransition(LOST);
+ break;
+
+ default:
+ // No-op.
+ }
+ }
+ };
// To be called on a task transitioning into the FINISHED state.
final Command rescheduleIfService = new Command() {
@Override public void execute() {
- if (task.getAssignedTask().getTask().isIsService()) {
- addWork(WorkCommand.RESCHEDULE);
+ if (task.get().getAssignedTask().getTask().isIsService()) {
+ addFollowup(RESCHEDULE);
}
}
};
@@ -248,24 +175,26 @@ class TaskStateMachine {
// To be called on a task transitioning into the FAILED state.
final Command incrementFailuresMaybeReschedule = new Command() {
@Override public void execute() {
- addWork(WorkCommand.INCREMENT_FAILURES);
+ addFollowup(INCREMENT_FAILURES);
// Max failures is ignored for service task.
- boolean isService = task.getAssignedTask().getTask().isIsService();
+ boolean isService = task.get().getAssignedTask().getTask().isIsService();
// Max failures is ignored when set to -1.
- int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
- if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
- addWork(WorkCommand.RESCHEDULE);
+ int maxFailures = task.get().getAssignedTask().getTask().getMaxTaskFailures();
+ boolean belowMaxFailures =
+ (maxFailures == -1) || (task.get().getFailureCount() < (maxFailures - 1));
+ if (isService || belowMaxFailures) {
+ addFollowup(RESCHEDULE);
} else {
- LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
+ LOG.info("Task " + name + " reached failure limit, not rescheduling");
}
}
};
- stateMachine = StateMachine.<State>builder(taskId)
+ stateMachine = StateMachine.<ScheduleStatus>builder(name)
.logTransitions()
- .initialState(State.create(initialState))
+ .initialState(initialState)
.addState(
Rule.from(INIT)
.to(PENDING, UNKNOWN))
@@ -273,11 +202,11 @@ class TaskStateMachine {
Rule.from(PENDING)
.to(ASSIGNED, KILLING)
.withCallback(
- new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
case KILLING:
- addWork(WorkCommand.DELETE);
+ addFollowup(DELETE);
break;
default:
@@ -291,16 +220,15 @@ class TaskStateMachine {
.to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
KILLING, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
break;
case PREEMPTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case FAILED:
@@ -308,25 +236,24 @@ class TaskStateMachine {
break;
case RESTARTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case KILLED:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case LOST:
- addWork(WorkCommand.RESCHEDULE);
- // fall through
- case KILLING:
- addWork(WorkCommand.KILL);
+ addFollowup(RESCHEDULE);
+ addFollowup(KILL);
break;
- case UNKNOWN:
+ case KILLING:
+ addFollowup(KILL);
break;
- default:
- // No-op.
+ default:
+ // No-op.
}
}
}
@@ -335,20 +262,19 @@ class TaskStateMachine {
Rule.from(STARTING)
.to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
break;
case RESTARTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case PREEMPTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case FAILED:
@@ -356,25 +282,25 @@ class TaskStateMachine {
break;
case KILLED:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case KILLING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case LOST:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case UNKNOWN:
// The slave previously acknowledged that it had the task, and now
// stopped reporting it.
- updateState(ScheduleStatus.LOST);
+ addFollowupTransition(LOST);
break;
- default:
- // No-op.
+ default:
+ // No-op.
}
}
}
@@ -383,20 +309,19 @@ class TaskStateMachine {
Rule.from(RUNNING)
.to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
break;
case PREEMPTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case RESTARTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case FAILED:
@@ -404,19 +329,19 @@ class TaskStateMachine {
break;
case KILLED:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case KILLING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case LOST:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case UNKNOWN:
- updateState(ScheduleStatus.LOST);
+ addFollowupTransition(LOST);
break;
default:
@@ -460,23 +385,25 @@ class TaskStateMachine {
// Since we want this action to be performed last in the transition sequence, the callback
// must be the last chained transition callback.
.onAnyTransition(
- new Closure<Transition<State>>() {
- @Override public void execute(final Transition<State> transition) {
- ScheduleStatus from = transition.getFrom().getState();
- ScheduleStatus to = transition.getTo().getState();
-
- if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
- // Prevent an update when killing a pending task, since the task is deleted
- // prior to the update.
- && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
- addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
- } else if (!transition.isAllowed()) {
- LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
- ILLEGAL_TRANSITIONS.incrementAndGet();
- }
-
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(final Transition<ScheduleStatus> transition) {
if (transition.isValidStateChange()) {
+ ScheduleStatus from = transition.getFrom();
+ ScheduleStatus to = transition.getTo();
+
+ // TODO(wfarner): Clean up this hack. This is here to suppress unnecessary work
+ // (save followed by delete), but it shows a wart with this catch-all behavior.
+ // Strongly consider pushing the SAVE_STATE behavior to each transition handler.
+ boolean pendingDeleteHack = !((from == PENDING) && (to == KILLING));
+
+ // Don't bother saving state of a task that is being removed.
+ if ((to != UNKNOWN) && pendingDeleteHack) {
+ addFollowup(SAVE_STATE);
+ }
previousState = from;
+ } else {
+ LOG.severe("Illegal state transition attempted: " + transition);
+ ILLEGAL_TRANSITIONS.incrementAndGet();
}
}
}
@@ -490,56 +417,67 @@ class TaskStateMachine {
.build();
}
- private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
- return new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> item) {
- addWork(work);
- }
- };
+ private void addFollowup(Action action) {
+ addFollowup(new SideEffect(action, Optional.<ScheduleStatus>absent()));
}
- private void addWork(WorkCommand work) {
- addWork(work, Functions.<IScheduledTask>identity());
+ private void addFollowupTransition(ScheduleStatus status) {
+ addFollowup(new SideEffect(STATE_CHANGE, Optional.of(status)));
}
- private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
- LOG.info("Adding work command " + work + " for " + this);
- workSink.addWork(work, TaskStateMachine.this, mutation);
+ private void addFollowup(SideEffect action) {
+ LOG.info("Adding work command " + action + " for " + this);
+ sideEffects.add(action);
}
- /**
- * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
- *
- * @param status Status to apply to the task.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(ScheduleStatus status) {
- return updateState(status, Functions.<IScheduledTask>identity());
+ private Closure<Transition<ScheduleStatus>> addWorkClosure(final Action action) {
+ return new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> item) {
+ addFollowup(action);
+ }
+ };
}
- /**
- * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
- *
- * @param status Status to apply to the task.
- * @param auditMessage The (optional) audit message to associate with the transition.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
- return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
- }
+ public static class TransitionResult {
+ private final boolean success;
+ private final Set<SideEffect> sideEffects;
- /**
- * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
- *
- * @param status Status to apply to the task.
- * @param mutation Mutate operation to perform while updating the task.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(
- ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation) {
+ public TransitionResult(boolean success, Set<SideEffect> sideEffects) {
+ this.success = success;
+ this.sideEffects = Preconditions.checkNotNull(sideEffects);
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public Set<SideEffect> getSideEffects() {
+ return sideEffects;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TransitionResult)) {
+ return false;
+ }
+
+ TransitionResult other = (TransitionResult) o;
+ return (success == other.success)
+ && Objects.equal(sideEffects, other.sideEffects);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(success, sideEffects);
+ }
- return updateState(status, mutation, Optional.<String>absent());
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("success", success)
+ .add("sideEffects", sideEffects)
+ .toString();
+ }
}
/**
@@ -548,59 +486,35 @@ class TaskStateMachine {
* will be appended to the work queue.
*
* @param status Status to apply to the task.
- * @param auditMessage The audit message to associate with the transition.
- * @param mutation Mutate operation to perform while updating the task.
* @return {@code true} if the state change was allowed, {@code false} otherwise.
*/
- public synchronized boolean updateState(
- final ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation,
- final Optional<String> auditMessage) {
-
+ public synchronized TransitionResult updateState(final ScheduleStatus status) {
checkNotNull(status);
- checkNotNull(mutation);
- checkNotNull(auditMessage);
+ Preconditions.checkState(sideEffects.isEmpty());
/**
* Don't bother applying noop state changes. If we end up modifying task state without a
* state transition (e.g. storing resource consumption of a running task), we need to find
* a different way to suppress noop transitions.
*/
- if (stateMachine.getState().getState() != status) {
- Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
- new Function<IScheduledTask, IScheduledTask>() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- ScheduledTask builder = task.newBuilder();
- builder.addToTaskEvents(new TaskEvent()
- .setTimestamp(clock.nowMillis())
- .setStatus(status)
- .setMessage(auditMessage.orNull())
- .setScheduler(LOCAL_HOST_SUPPLIER.get()));
- return IScheduledTask.build(builder);
- }
- });
- return stateMachine.transition(State.create(status, operation));
+ if (stateMachine.getState() == status) {
+ return new TransitionResult(false, ImmutableSet.<SideEffect>of());
}
- return false;
+ boolean success = stateMachine.transition(status);
+ Set<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects);
+ sideEffects.clear();
+ return new TransitionResult(success, transitionEffects);
}
/**
* Fetch the current state from the state machine.
+ * TODO(wfarner): Consider removing, the caller should know this.
*
* @return The current state.
*/
public synchronized ScheduleStatus getState() {
- return stateMachine.getState().getState();
- }
-
- /**
- * Gets the ID for the task that this state machine manages.
- *
- * @return The state machine's task ID.
- */
- public String getTaskId() {
- return taskId;
+ return stateMachine.getState();
}
/**
@@ -616,6 +530,6 @@ class TaskStateMachine {
@Override
public String toString() {
- return getTaskId();
+ return stateMachine.getName();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java b/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
deleted file mode 100644
index aff74d5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-/**
- * Descriptions of the different types of external work commands that task state machines may
- * trigger.
- */
-enum WorkCommand {
- // Send an instruction for the runner of this task to kill the task.
- KILL,
- // Create a new state machine with a copy of this task.
- RESCHEDULE,
- // Update the task's state (schedule status) in the persistent store to match the state machine.
- UPDATE_STATE,
- // Delete this task from the persistent store.
- DELETE,
- // Increment the failure count for this task.
- INCREMENT_FAILURES
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 79f5605..26468ce 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -200,6 +200,10 @@ public interface Storage {
/**
* Executes the unit of mutating {@code work}.
+ * TODO(wfarner): Add a mechanism by which mutating work can add side-effect operations to be
+ * performed after completion of the outer-most transaction. As it stands, it's somewhat
+ * futile to try to achieve this within a transaction, since the local code does not know
+ * if the current transaction is nested.
*
* @param work The unit of work to execute.
* @param <T> The type of result this unit of work produces.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 7fe297a..3d0ff2d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -71,6 +71,8 @@ public interface TaskStore {
/**
* Offers temporary mutable access to tasks. If a task ID is not found, it will be silently
* skipped, and no corresponding task will be returned.
+ * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case,
+ * and it prevents the caller from worrying about a bad query having broad impact.
*
* @param query Query to match tasks against.
* @param mutator The mutate operation.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index b93e47f..af20e82 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -40,6 +41,7 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskRescheduled;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.storage.Storage;
@@ -50,14 +52,14 @@ import org.apache.mesos.Protos.SlaveID;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IArgumentMatcher;
-import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
@@ -65,6 +67,7 @@ import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class StateManagerImplTest extends EasyMockTest {
@@ -90,11 +93,6 @@ public class StateManagerImplTest extends EasyMockTest {
stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
}
- @After
- public void validateCompletion() {
- assertTrue(stateManager.getStorage().getEvents().isEmpty());
- }
-
private static class StateChangeMatcher implements IArgumentMatcher {
private final String taskId;
private final ScheduleStatus from;
@@ -171,7 +169,7 @@ public class StateManagerImplTest extends EasyMockTest {
.setStatus(PENDING)
.setTaskEvents(ImmutableList.of(new TaskEvent()
.setTimestamp(clock.nowMillis())
- .setScheduler(TaskStateMachine.LOCAL_HOST_SUPPLIER.get())
+ .setScheduler(StateManagerImpl.LOCAL_HOST_SUPPLIER.get())
.setStatus(PENDING)))
.setAssignedTask(new AssignedTask()
.setInstanceId(3)
@@ -265,6 +263,87 @@ public class StateManagerImplTest extends EasyMockTest {
changeState(unknownTask, RUNNING);
}
+ @Test
+ public void testIncrementFailureCount() {
+ ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+ String taskId = "a";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+ expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+
+ String taskId2 = "a2";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+ expectStateTransitions(taskId2, INIT, PENDING);
+ eventSink.post(new TaskRescheduled(task.getOwner().getRole(), task.getJobName(), 0));
+
+ control.replay();
+
+ insertTask(task, 0);
+
+ assignTask(taskId, HOST_A);
+ changeState(taskId, RUNNING);
+ changeState(taskId, FAILED);
+ IScheduledTask rescheduledTask = Iterables.getOnlyElement(
+ Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId2)));
+ assertEquals(1, rescheduledTask.getFailureCount());
+ }
+
+ @Test
+ public void testDoubleTransition() {
+ // Tests that a transition inducing another transition (STATE_CHANGE action) is performed.
+
+ ITaskConfig task = makeTask(JIM, MY_JOB);
+ String taskId = "a";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+ expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, LOST);
+
+ String taskId2 = "a2";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+ expectStateTransitions(taskId2, INIT, PENDING);
+ eventSink.post(new TaskRescheduled(task.getOwner().getRole(), task.getJobName(), 0));
+
+ control.replay();
+
+ insertTask(task, 0);
+
+ assignTask(taskId, HOST_A);
+ changeState(taskId, RUNNING);
+ changeState(taskId, UNKNOWN);
+ }
+
+ @Test
+ public void testCasTaskPresent() {
+ ITaskConfig task = makeTask(JIM, MY_JOB);
+ String taskId = "a";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+ expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, FAILED);
+
+ control.replay();
+
+ insertTask(task, 0);
+ assignTask(taskId, HOST_A);
+ assertFalse(stateManager.changeState(
+ taskId,
+ Optional.of(PENDING),
+ RUNNING,
+ Optional.<String>absent()));
+ assertTrue(stateManager.changeState(
+ taskId,
+ Optional.of(ASSIGNED),
+ FAILED,
+ Optional.<String>absent()));
+ }
+
+ @Test
+ public void testCasTaskNotFound() {
+ control.replay();
+
+ assertFalse(stateManager.changeState(
+ "a",
+ Optional.of(PENDING),
+ ASSIGNED,
+ Optional.<String>absent()));
+ }
+
private void expectStateTransitions(
String taskId,
ScheduleStatus initial,