You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/13 21:33:41 UTC
[1/3] git commit: Begin cleanup of StateManager interface by removing
state change via Query.
Updated Branches:
refs/heads/wfarner/state_machine_refactor [created] 70adc19ad
Begin cleanup of StateManager interface by removing state change via Query.
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/801dcfbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/801dcfbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/801dcfbd
Branch: refs/heads/wfarner/state_machine_refactor
Commit: 801dcfbd55e3edfd47e1a504d4fd5e34fc3c2eb6
Parents: 7e87588
Author: Bill Farner <bi...@twitter.com>
Authored: Wed Jan 8 16:18:48 2014 -0800
Committer: Bill Farner <bi...@twitter.com>
Committed: Mon Jan 13 12:22:05 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/UserTaskLauncher.java | 4 +-
.../aurora/scheduler/async/TaskScheduler.java | 8 +--
.../aurora/scheduler/async/TaskTimeout.java | 9 ++-
.../scheduler/state/MaintenanceController.java | 21 +++---
.../aurora/scheduler/state/SchedulerCore.java | 4 +-
.../scheduler/state/SchedulerCoreImpl.java | 60 +++++++++++-----
.../aurora/scheduler/state/StateManager.java | 29 +++++---
.../scheduler/state/StateManagerImpl.java | 75 ++++++--------------
.../thrift/SchedulerThriftInterface.java | 3 +-
.../aurora/scheduler/UserTaskLauncherTest.java | 19 +++--
.../scheduler/async/TaskSchedulerTest.java | 5 +-
.../aurora/scheduler/async/TaskTimeoutTest.java | 26 +++++--
.../state/BaseSchedulerCoreImplTest.java | 4 +-
.../state/MaintenanceControllerImplTest.java | 7 +-
.../scheduler/state/StateManagerImplTest.java | 32 ++++++---
.../thrift/SchedulerThriftInterfaceTest.java | 2 +-
16 files changed, 174 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 7e180ac..010776e 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,7 +27,6 @@ import com.google.common.base.Optional;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.OfferQueue;
import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.mesos.Protos.Offer;
@@ -85,7 +84,8 @@ class UserTaskLauncher implements TaskLauncher {
}
stateManager.changeState(
- Query.taskScoped(status.getTaskId().getValue()),
+ status.getTaskId().getValue(),
+ Optional.<ScheduleStatus>absent(),
translatedState,
Optional.fromNullable(message));
} catch (SchedulerException e) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 96c76ba..4afc332 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -161,9 +161,9 @@ interface TaskScheduler extends EventSubscriber {
return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
@Override public TaskSchedulerResult apply(MutableStoreProvider store) {
LOG.fine("Attempting to schedule task " + taskId);
- Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
- final IScheduledTask task =
- Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery), null);
+ final IScheduledTask task = Iterables.getOnlyElement(
+ store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+ null);
if (task == null) {
LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
} else {
@@ -182,7 +182,7 @@ interface TaskScheduler extends EventSubscriber {
// It is in the LOST state and a new task will move to PENDING to replace it.
// Should the state change fail due to storage issues, that's okay. The task will
// time out in the ASSIGNED state and be moved to LOST.
- stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
+ stateManager.changeState(taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 9f14a99..64a1941 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -42,7 +42,6 @@ import com.twitter.common.stats.StatsProvider;
import com.twitter.common.util.Clock;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
@@ -186,9 +185,13 @@ class TaskTimeout implements EventSubscriber {
// timeout is still valid. Ideally, the future would have already been canceled, but in the
// event of a state transition race, including transientState prevents an unintended
// task timeout.
- Query.Builder query = Query.taskScoped(key.taskId).byStatus(key.status);
// Note: This requires LOST transitions trigger Driver.killTask.
- if (stateManager.changeState(query, ScheduleStatus.LOST, TIMEOUT_MESSAGE) > 0) {
+ if (stateManager.changeState(
+ key.taskId,
+ Optional.of(key.status),
+ ScheduleStatus.LOST,
+ TIMEOUT_MESSAGE)) {
+
timedOutTasks.incrementAndGet();
} else {
LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 3dd2271..af10d44 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -27,7 +27,6 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
-import com.twitter.common.base.Closure;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.HostStatus;
@@ -119,11 +118,7 @@ public interface MaintenanceController {
this.eventSink = checkNotNull(eventSink);
}
- private Set<HostStatus> watchDrainingTasks(
- MutableStoreProvider store,
- Set<String> hosts,
- Closure<Query.Builder> callback) {
-
+ private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
Set<String> emptyHosts = Sets.newHashSet();
for (String host : hosts) {
// If there are no tasks on the host, immediately transition to DRAINED.
@@ -134,7 +129,13 @@ public interface MaintenanceController {
if (activeTasks.isEmpty()) {
emptyHosts.add(host);
} else {
- callback.execute(query);
+ for (String taskId : activeTasks) {
+ stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.RESTARTING,
+ DRAINING_MESSAGE);
+ }
}
}
@@ -186,11 +187,7 @@ public interface MaintenanceController {
public Set<HostStatus> drain(final Set<String> hosts) {
return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
@Override public Set<HostStatus> apply(MutableStoreProvider store) {
- return watchDrainingTasks(store, hosts, new Closure<Query.Builder>() {
- @Override public void execute(Query.Builder query) {
- stateManager.changeState(query, ScheduleStatus.RESTARTING, DRAINING_MESSAGE);
- }
- });
+ return watchDrainingTasks(store, hosts);
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index b89d990..7a88629 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -83,11 +83,11 @@ public interface SchedulerCore {
/**
* Assigns a new state to tasks.
*
- * @param query Builder for a query to identify tasks
+ * @param taskId ID of the task to transition.
* @param status The new state of the tasks.
* @param message Additional information about the state transition.
*/
- void setTaskStatus(Query.Builder query, ScheduleStatus status, Optional<String> message);
+ void setTaskStatus(String taskId, ScheduleStatus status, Optional<String> message);
/**
* Kills a specific set of tasks.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index 1d450f2..8dec283 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -75,9 +75,8 @@ class SchedulerCoreImpl implements SchedulerCore {
// Schedulers that are responsible for triggering execution of jobs.
private final ImmutableList<JobManager> jobManagers;
- // TODO(Bill Farner): Avoid using StateManagerImpl.
// State manager handles persistence of task modifications and state transitions.
- private final StateManagerImpl stateManager;
+ private final StateManager stateManager;
private final TaskIdGenerator taskIdGenerator;
private final JobFilter jobFilter;
@@ -97,7 +96,7 @@ class SchedulerCoreImpl implements SchedulerCore {
Storage storage,
CronJobManager cronScheduler,
ImmediateJobManager immediateScheduler,
- StateManagerImpl stateManager,
+ StateManager stateManager,
TaskIdGenerator taskIdGenerator,
JobFilter jobFilter) {
@@ -118,7 +117,9 @@ class SchedulerCoreImpl implements SchedulerCore {
@Override
public synchronized void tasksDeleted(Set<String> taskIds) {
- setTaskStatus(Query.taskScoped(taskIds), ScheduleStatus.UNKNOWN, Optional.<String>absent());
+ for (String taskId : taskIds) {
+ setTaskStatus(taskId, ScheduleStatus.UNKNOWN, Optional.<String>absent());
+ }
}
@Override
@@ -245,18 +246,20 @@ class SchedulerCoreImpl implements SchedulerCore {
@Override
public synchronized void setTaskStatus(
- Query.Builder query,
+ String taskId,
final ScheduleStatus status,
Optional<String> message) {
- checkNotNull(query);
+ checkNotNull(taskId);
checkNotNull(status);
- stateManager.changeState(query, status, message);
+ stateManager.changeState(taskId, Optional.<ScheduleStatus>absent(), status, message);
}
@Override
- public synchronized void killTasks(Query.Builder query, String user) throws ScheduleException {
+ public synchronized void killTasks(Query.Builder query, final String user)
+ throws ScheduleException {
+
checkNotNull(query);
LOG.info("Killing tasks matching " + query);
@@ -274,10 +277,28 @@ class SchedulerCoreImpl implements SchedulerCore {
}
// Unless statuses were specifically supplied, only attempt to kill active tasks.
- Query.Builder taskQuery = query.get().isSetStatuses() ? query.byStatus(ACTIVE_STATES) : query;
+ final Query.Builder taskQuery = query.get().isSetStatuses()
+ ? query.byStatus(ACTIVE_STATES)
+ : query;
+
+ int tasksAffected = storage.write(new MutateWork.Quiet<Integer>() {
+ @Override public Integer apply(MutableStoreProvider storeProvider) {
+ int total = 0;
+ for (String taskId : Tasks.ids(storeProvider.getTaskStore().fetchTasks(taskQuery))) {
+ boolean changed = stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ KILLING,
+ Optional.of("Killed by " + user));
+
+ if (changed) {
+ total++;
+ }
+ }
+ return total;
+ }
+ });
- int tasksAffected =
- stateManager.changeState(taskQuery, KILLING, Optional.of("Killed by " + user));
if (!jobDeleted && (tasksAffected == 0)) {
throw new ScheduleException("No jobs to kill");
}
@@ -307,22 +328,27 @@ class SchedulerCoreImpl implements SchedulerCore {
throw new ScheduleException("Not all requested shards are active.");
}
LOG.info("Restarting shards matching " + query);
- stateManager.changeState(
- Query.taskScoped(Tasks.ids(matchingTasks)),
- RESTARTING,
- Optional.of("Restarted by " + requestingUser));
+ for (String taskId : Tasks.ids(matchingTasks)) {
+ stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ RESTARTING,
+ Optional.of("Restarted by " + requestingUser));
+ }
}
});
}
-
@Override
public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
checkNotNull(task);
checkNotNull(preemptingTask);
// TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
- stateManager.changeState(Query.taskScoped(task.getTaskId()), ScheduleStatus.PREEMPTING,
+ stateManager.changeState(
+ task.getTaskId(),
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.PREEMPTING,
Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 045165d..785a5ef 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -21,27 +21,36 @@ import java.util.Set;
import com.google.common.base.Optional;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos.SlaveID;
/**
- * Thin interface for the state manager.
+ * A manager for the state of tasks. Most modifications to tasks should be made here, especially
+ * those that alter the {@link ScheduleStatus} of tasks.
*/
public interface StateManager {
/**
- * Performs a simple state change, transitioning all tasks matching a query to the given
- * state and applying the given audit message.
- * TODO(William Farner): Consider removing the return value.
+ * Attempts to alter a task from its existing state to {@code newState}. If a {@code casState} is
+ * provided, the transition will only performed if the task is currently in the state.
+ *
+ * @param taskId ID of the task to transition.
+ * @param casState State that the task must be in for the operation to proceed. If the task
+ * is found to not be in {@code casState}, no action is performed and
+ * {@code false} is returned.
+ * @param newState State to move the task to.
+ * @param auditMessage Message to include with the transition.
+ * @return {@code true} if the transition was performed and the task was moved to
+ * {@code newState}, {@code false} if the transition was not allowed, or the task was not
+ * in {@code casState}.
*
- * @param query Builder of the query to perform, the results of which will be modified.
- * @param newState State to move the resulting tasks into.
- * @param auditMessage Audit message to apply along with the state change.
- * @return the number of successful state changes.
*/
- int changeState(Query.Builder query, ScheduleStatus newState, Optional<String> auditMessage);
+ boolean changeState(
+ String taskId,
+ Optional<ScheduleStatus> casState,
+ ScheduleStatus newState,
+ Optional<String> auditMessage);
/**
* Assigns a task to a specific slave.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index b6dd537..6078eee 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -31,8 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -53,8 +51,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -76,8 +72,8 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
* Manager of all persistence-related operations for the scheduler. Acts as a controller for
* persisted state machine transitions, and their side-effects.
*
- * TODO(William Farner): Re-evaluate thread safety here, specifically risk of races that
- * modify managerState.
+ * TODO(wfarner): This class is due for an overhaul. There are several aspects of it that could
+ * probably be made much simpler. The first phase
*/
public class StateManagerImpl implements StateManager {
private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
@@ -195,13 +191,15 @@ public class StateManagerImpl implements StateManager {
}
@Override
- public int changeState(
- Query.Builder query,
+ public boolean changeState(
+ String taskId,
+ Optional<ScheduleStatus> casState,
final ScheduleStatus newState,
final Optional<String> auditMessage) {
- return changeState(query, new Function<TaskStateMachine, Boolean>() {
- @Override public Boolean apply(TaskStateMachine stateMachine) {
+ return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
+ @Override
+ public Boolean apply(TaskStateMachine stateMachine) {
return stateMachine.updateState(newState, auditMessage);
}
});
@@ -219,34 +217,26 @@ public class StateManagerImpl implements StateManager {
checkNotNull(assignedPorts);
TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
- changeState(Query.taskScoped(taskId), mutation);
+ changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
return mutation.getAssignedTask();
}
- private int changeStateInWriteOperation(
- Set<String> taskIds,
- Function<TaskStateMachine, Boolean> stateChange) {
-
- int count = 0;
- for (TaskStateMachine stateMachine : getStateMachines(taskIds).values()) {
- if (stateChange.apply(stateMachine)) {
- ++count;
- }
- }
- return count;
- }
-
- private int changeState(
- final Query.Builder query,
+ private boolean changeState(
+ final String taskId,
+ final Optional<ScheduleStatus> casState,
final Function<TaskStateMachine, Boolean> stateChange) {
- return storage.write(storage.new QuietSideEffectWork<Integer>() {
- @Override public Integer apply(MutableStoreProvider storeProvider) {
- Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
- return changeStateInWriteOperation(ids, stateChange);
+ return storage.write(storage.new QuietSideEffectWork<Boolean>() {
+ @Override public Boolean apply(MutableStoreProvider storeProvider) {
+ IScheduledTask task = Iterables.getOnlyElement(
+ storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+ null);
+ if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
+ return false;
+ }
+
+ return stateChange.apply(getStateMachine(taskId, task));
}
});
}
@@ -405,27 +395,6 @@ public class StateManagerImpl implements StateManager {
});
}
- private Map<String, TaskStateMachine> getStateMachines(final Set<String> taskIds) {
- return storage.consistentRead(new Work.Quiet<Map<String, TaskStateMachine>>() {
- @Override public Map<String, TaskStateMachine> apply(StoreProvider storeProvider) {
- Map<String, IScheduledTask> existingTasks = Maps.uniqueIndex(
- storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
- new Function<IScheduledTask, String>() {
- @Override public String apply(IScheduledTask input) {
- return input.getAssignedTask().getTaskId();
- }
- });
-
- ImmutableMap.Builder<String, TaskStateMachine> builder = ImmutableMap.builder();
- for (String taskId : taskIds) {
- // Pass null get() values through.
- builder.put(taskId, getStateMachine(taskId, existingTasks.get(taskId)));
- }
- return builder.build();
- }
- });
- }
-
private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
if (task != null) {
return createStateMachine(task, task.getStatus());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index c1a11bd..76caa62 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -672,8 +672,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
return response;
}
- schedulerCore.setTaskStatus(
- Query.taskScoped(taskId), status, transitionMessage(context.getIdentity()));
+ schedulerCore.setTaskStatus(taskId, status, transitionMessage(context.getIdentity()));
return new Response().setResponseCode(OK).setMessage("Transition attempted.");
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index bba1b72..ffc37db 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Iterables;
import com.twitter.common.collections.Pair;
import com.twitter.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.OfferQueue;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
@@ -88,9 +88,12 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test
public void testForwardsStatusUpdates() throws Exception {
- expect(
- stateManager.changeState(Query.taskScoped(TASK_ID_A), RUNNING, Optional.of("fake message")))
- .andReturn(1);
+ expect(stateManager.changeState(
+ TASK_ID_A,
+ Optional.<ScheduleStatus>absent(),
+ RUNNING,
+ Optional.of("fake message")))
+ .andReturn(true);
control.replay();
@@ -114,7 +117,8 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test(expected = StorageException.class)
public void testFailedStatusUpdate() throws Exception {
expect(stateManager.changeState(
- Query.taskScoped(TASK_ID_A),
+ TASK_ID_A,
+ Optional.<ScheduleStatus>absent(),
RUNNING,
Optional.of("fake message")))
.andThrow(new StorageException("Injected error"));
@@ -132,10 +136,11 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test
public void testMemoryLimitTranslationHack() throws Exception {
expect(stateManager.changeState(
- Query.taskScoped(TASK_ID_A),
+ TASK_ID_A,
+ Optional.<ScheduleStatus>absent(),
FAILED,
Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY)))
- .andReturn(0);
+ .andReturn(false);
control.replay();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 9698f28..d06ea11 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -324,10 +324,11 @@ public class TaskSchedulerTest extends EasyMockTest {
driver.launchTask(OFFER_A.getId(), mesosTask);
expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
expect(stateManager.changeState(
- Query.taskScoped("a").byStatus(PENDING),
+ "a",
+ Optional.of(PENDING),
LOST,
TaskSchedulerImpl.LAUNCH_FAILED_MSG))
- .andReturn(1);
+ .andReturn(true);
replayAndCreateScheduler();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 023905b..d4685f6 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
@@ -35,7 +36,6 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -175,8 +175,12 @@ public class TaskTimeoutTest extends EasyMockTest {
expectTaskWatch();
expectCancel();
Capture<Runnable> killingTimeout = expectTaskWatch();
- Query.Builder query = Query.taskScoped(TASK_ID).byStatus(KILLING);
- expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(1);
+ expect(stateManager.changeState(
+ TASK_ID,
+ Optional.of(KILLING),
+ LOST,
+ TaskTimeout.TIMEOUT_MESSAGE))
+ .andReturn(true);
replayAndCreate();
@@ -188,8 +192,12 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testTimeout() throws Exception {
Capture<Runnable> assignedTimeout = expectTaskWatch();
- Query.Builder query = Query.taskScoped(TASK_ID).byStatus(ASSIGNED);
- expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(1);
+ expect(stateManager.changeState(
+ TASK_ID,
+ Optional.of(ASSIGNED),
+ LOST,
+ TaskTimeout.TIMEOUT_MESSAGE))
+ .andReturn(true);
replayAndCreate();
@@ -202,8 +210,12 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testTaskDeleted() throws Exception {
Capture<Runnable> assignedTimeout = expectTaskWatch();
- Query.Builder query = Query.taskScoped(TASK_ID).byStatus(KILLING);
- expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(0);
+ expect(stateManager.changeState(
+ TASK_ID,
+ Optional.of(KILLING),
+ LOST,
+ TaskTimeout.TIMEOUT_MESSAGE))
+ .andReturn(false);
replayAndCreate();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 720d0c8..6ddea9f 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -1453,7 +1453,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
ScheduleStatus status,
Optional<String> message) {
- scheduler.setTaskStatus(query, status, message);
+ for (String taskId : Tasks.ids(Storage.Util.consistentFetchTasks(storage, query))) {
+ scheduler.setTaskStatus(taskId, status, message);
+ }
}
public void changeStatus(Query.Builder query, ScheduleStatus status) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index c0addd6..f7f7632 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -33,6 +33,7 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -55,7 +56,6 @@ import static org.junit.Assert.assertEquals;
public class MaintenanceControllerImplTest extends EasyMockTest {
private static final String HOST_A = "a";
- private static final String HOST_B = "b";
private static final Set<String> A = ImmutableSet.of(HOST_A);
private StorageTestUtil storageUtil;
@@ -102,10 +102,11 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
expectMaintenanceModeChange(HOST_A, SCHEDULED);
expectFetchTasksByHost(HOST_A, ImmutableSet.<ScheduledTask>of(task));
expect(stateManager.changeState(
- Query.slaveScoped(HOST_A).active(),
+ Tasks.id(task),
+ Optional.<ScheduleStatus>absent(),
ScheduleStatus.RESTARTING,
MaintenanceControllerImpl.DRAINING_MESSAGE))
- .andReturn(1);
+ .andReturn(true);
expectMaintenanceModeChange(HOST_A, DRAINING);
expect(storageUtil.attributeStore.getHostAttributes(HOST_A))
.andReturn(Optional.of(new HostAttributes().setHost(HOST_A).setMode(DRAINING)));
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index b17b983..b93e47f 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -52,6 +52,7 @@ import org.easymock.IAnswer;
import org.easymock.IArgumentMatcher;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
@@ -191,8 +192,8 @@ public class StateManagerImplTest extends EasyMockTest {
control.replay();
insertTask(task, 0);
- assertEquals(1, changeState(taskId, KILLING));
- assertEquals(0, changeState(taskId, KILLING));
+ assertEquals(true, changeState(taskId, KILLING));
+ assertEquals(false, changeState(taskId, KILLING));
}
@Test
@@ -217,16 +218,16 @@ public class StateManagerImplTest extends EasyMockTest {
@Test
public void testNestedEvents() {
- String id = "a";
+ final String id = "a";
ITaskConfig task = makeTask(JIM, MY_JOB);
expect(taskIdGenerator.generate(task, 0)).andReturn(id);
// Trigger an event that produces a side-effect and a PubSub event .
eventSink.post(matchStateChange(id, INIT, PENDING));
expectLastCall().andAnswer(new IAnswer<Void>() {
- @Override public Void answer() throws Throwable {
- stateManager.changeState(
- Query.unscoped(), ScheduleStatus.ASSIGNED, Optional.<String>absent());
+ @Override
+ public Void answer() throws Throwable {
+ changeState(id, ASSIGNED);
return null;
}
});
@@ -253,6 +254,17 @@ public class StateManagerImplTest extends EasyMockTest {
stateManager.deleteTasks(ImmutableSet.of(taskId));
}
+ @Test
+ public void testKillUnknownTask() {
+ String unknownTask = "unknown";
+
+ driver.killTask(unknownTask);
+
+ control.replay();
+
+ changeState(unknownTask, RUNNING);
+ }
+
private void expectStateTransitions(
String taskId,
ScheduleStatus initial,
@@ -279,8 +291,12 @@ public class StateManagerImplTest extends EasyMockTest {
stateManager.insertPendingTasks(ImmutableMap.of(instanceId, task));
}
- private int changeState(String taskId, ScheduleStatus status) {
- return stateManager.changeState(Query.taskScoped(taskId), status, Optional.<String>absent());
+ private boolean changeState(String taskId, ScheduleStatus status) {
+ return stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ status,
+ Optional.<String>absent());
}
private static ITaskConfig makeTask(Identity owner, String job) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/801dcfbd/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 91c1c24..b46f29a 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -474,7 +474,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
String taskId = "task_id_foo";
ScheduleStatus status = ScheduleStatus.FAILED;
- scheduler.setTaskStatus(Query.taskScoped(taskId), status, transitionMessage(USER));
+ scheduler.setTaskStatus(taskId, status, transitionMessage(USER));
// Expect auth is first called by an interceptor and then by SchedulerThriftInterface to extract
// the SessionContext.
// Note: This will change after AOP-style session validation passes in a SessionContext.
[2/3] Refactor StateManagerImpl and TaskStateMachine for less code
and better readability.
Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
index e89e60a..0a3a520 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
@@ -15,23 +15,27 @@
*/
package org.apache.aurora.scheduler.state;
+import java.util.Map;
import java.util.Set;
import com.google.common.base.Function;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.state.TaskStateMachine.WorkSink;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
+import org.apache.aurora.scheduler.state.TaskStateMachine.TransitionResult;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
@@ -43,74 +47,43 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.KILLING;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.WorkCommand.DELETE;
-import static org.apache.aurora.scheduler.state.WorkCommand.INCREMENT_FAILURES;
-import static org.apache.aurora.scheduler.state.WorkCommand.KILL;
-import static org.apache.aurora.scheduler.state.WorkCommand.RESCHEDULE;
-import static org.apache.aurora.scheduler.state.WorkCommand.UPDATE_STATE;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class TaskStateMachineTest extends EasyMockTest {
+// TODO(wfarner): At this rate, it's probably best to exhaustively cover this class with a matrix
+// from every state to every state.
+public class TaskStateMachineTest {
- private WorkSink workSink;
- private FakeClock clock;
private TaskStateMachine stateMachine;
@Before
public void setUp() {
- workSink = createMock(WorkSink.class);
- clock = new FakeClock();
- stateMachine = makeStateMachine("test", makeTask(false));
+ stateMachine = makeStateMachine(makeTask(false));
}
- private TaskStateMachine makeStateMachine(String taskId, ScheduledTask builder) {
- return new TaskStateMachine(
- taskId,
- IScheduledTask.build(builder),
- workSink,
- clock,
- INIT);
+ private TaskStateMachine makeStateMachine(ScheduledTask builder) {
+ return new TaskStateMachine(IScheduledTask.build(builder));
}
@Test
public void testSimpleTransition() {
- expectWork(UPDATE_STATE).times(5);
- expectWork(DELETE);
-
- control.replay();
-
- transition(stateMachine, PENDING);
- assertEquals(INIT, stateMachine.getPreviousState());
- transition(stateMachine, ASSIGNED);
- assertEquals(PENDING, stateMachine.getPreviousState());
- transition(stateMachine, STARTING);
- assertEquals(ASSIGNED, stateMachine.getPreviousState());
- transition(stateMachine, RUNNING);
- assertEquals(STARTING, stateMachine.getPreviousState());
- transition(stateMachine, FINISHED);
- assertEquals(RUNNING, stateMachine.getPreviousState());
- transition(stateMachine, UNKNOWN);
- assertEquals(FINISHED, stateMachine.getPreviousState());
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING, FINISHED);
+ legalTransition(UNKNOWN, Action.DELETE);
}
@Test
public void testServiceRescheduled() {
- stateMachine = makeStateMachine("test", makeTask(true));
- expectWork(UPDATE_STATE).times(5);
- expectWork(RESCHEDULE);
-
- control.replay();
-
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FINISHED);
+ stateMachine = makeStateMachine(makeTask(true));
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(FINISHED, Action.SAVE_STATE, Action.RESCHEDULE);
}
@Test
@@ -118,12 +91,12 @@ public class TaskStateMachineTest extends EasyMockTest {
Set<ScheduleStatus> terminalStates = Tasks.TERMINAL_STATES;
for (ScheduleStatus endState : terminalStates) {
- stateMachine = makeStateMachine("test", makeTask(false));
- expectWork(UPDATE_STATE).times(5);
+ stateMachine = makeStateMachine(makeTask(false));
+ Set<SideEffect.Action> finalActions = Sets.newHashSet(Action.SAVE_STATE);
switch (endState) {
case FAILED:
- expectWork(INCREMENT_FAILURES);
+ finalActions.add(Action.INCREMENT_FAILURES);
break;
case FINISHED:
@@ -131,138 +104,205 @@ public class TaskStateMachineTest extends EasyMockTest {
case KILLED:
case LOST:
- expectWork(RESCHEDULE);
+ finalActions.add(Action.RESCHEDULE);
break;
case KILLING:
- expectWork(KILL);
+ finalActions.add(Action.KILL);
break;
default:
fail("Unknown state " + endState);
}
- control.replay();
-
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, endState);
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(endState, finalActions);
for (ScheduleStatus badTransition : terminalStates) {
- transition(stateMachine, badTransition);
+ illegalTransition(badTransition);
}
-
- control.verify();
- control.reset();
}
-
- control.replay(); // Needed so the teardown verify doesn't break.
}
@Test
public void testUnknownTask() {
- expectWork(KILL);
-
- control.replay();
+ stateMachine = new TaskStateMachine("id");
- transition(stateMachine, UNKNOWN, RUNNING);
+ illegalTransition(RUNNING, Action.KILL);
}
@Test
public void testLostTask() {
- expectWork(UPDATE_STATE).times(5);
- expectWork(RESCHEDULE);
-
- control.replay();
-
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, LOST);
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(LOST, Action.SAVE_STATE, Action.RESCHEDULE);
}
@Test
public void testKilledPending() {
- expectWork(UPDATE_STATE);
- expectWork(DELETE);
+ expectUpdateStateOnTransitionTo(PENDING);
+ legalTransition(KILLING, Action.DELETE);
+ }
- control.replay();
+ @Test
+ public void testMissingStartingRescheduledImmediately() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+ illegalTransition(UNKNOWN,
+ ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
+ }
- transition(stateMachine, PENDING, KILLING);
+ @Test
+ public void testMissingRunningRescheduledImmediately() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ illegalTransition(UNKNOWN,
+ ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
}
@Test
- public void testMissingStartingRescheduledImmediately() {
- ScheduledTask task = makeTask(false);
- task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.PENDING));
- stateMachine = makeStateMachine("test", task);
+ public void testRestartedTask() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ legalTransition(FINISHED, Action.SAVE_STATE, Action.RESCHEDULE);
+ }
- expectWork(UPDATE_STATE).times(4);
- expectWork(RESCHEDULE);
+ @Test
+ public void testRogueRestartedTask() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ illegalTransition(RUNNING, Action.KILL);
+ }
- control.replay();
+ @Test
+ public void testPendingRestartedTask() {
+ expectUpdateStateOnTransitionTo(PENDING);
+ // PENDING -> RESTARTING should not be allowed.
+ illegalTransition(RESTARTING);
+ }
- transition(stateMachine, PENDING, ASSIGNED, STARTING, UNKNOWN);
- assertThat(stateMachine.getState(), is(ScheduleStatus.LOST));
+ @Test
+ public void testAllowsSkipStartingAndRunning() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, FINISHED);
}
@Test
- public void testMissingRunningRescheduledImmediately() {
- ScheduledTask task = makeTask(false);
- task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.PENDING));
- stateMachine = makeStateMachine("test", task);
+ public void testAllowsSkipRunning() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, FINISHED);
+ }
- expectWork(UPDATE_STATE).times(5);
- expectWork(RESCHEDULE);
+ @Test
+ public void testRestartingToAssigned() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ illegalTransition(ASSIGNED, Action.KILL);
+ }
- control.replay();
+ @Test
+ public void testRestartingToStarting() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ illegalTransition(STARTING, Action.KILL);
+ }
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, UNKNOWN);
- assertThat(stateMachine.getState(), is(ScheduleStatus.LOST));
+ @Test
+ public void testRestartingToFailed() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ legalTransition(FAILED, Action.RESCHEDULE, Action.SAVE_STATE);
}
@Test
- public void testRestartedTask() {
- expectWork(UPDATE_STATE).times(6);
- expectWork(KILL);
- expectWork(RESCHEDULE);
+ public void testRestartingToKilled() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ legalTransition(KILLED, Action.RESCHEDULE, Action.SAVE_STATE);
+ }
- control.replay();
+ @Test
+ public void testRestartingToLost() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ legalTransition(LOST, Action.SAVE_STATE, Action.KILL, Action.RESCHEDULE);
+ }
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, RESTARTING, FINISHED);
+ @Test
+ public void testRestartingToUnknown() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ illegalTransition(UNKNOWN,
+ ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
}
@Test
- public void testRogueRestartedTask() {
- expectWork(UPDATE_STATE).times(5);
- expectWork(KILL).times(2);
+ public void testAssignedToPrempting() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+ legalTransition(PREEMPTING, Action.SAVE_STATE, Action.KILL);
+ }
- control.replay();
+ @Test
+ public void testAssignedToFailed() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+ legalTransition(FAILED, Action.SAVE_STATE, Action.INCREMENT_FAILURES);
+ }
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, RESTARTING, RUNNING);
+ @Test
+ public void testAssignedToRestarting() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
}
@Test
- public void testPendingRestartedTask() {
- expectWork(UPDATE_STATE).times(1);
+ public void testAssignedToKilled() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+ legalTransition(KILLED, Action.SAVE_STATE, Action.RESCHEDULE);
+ }
- control.replay();
+ @Test
+ public void testAssignedToLost() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+ legalTransition(LOST, Action.SAVE_STATE, Action.RESCHEDULE, Action.KILL);
+ }
- // PENDING -> RESTARTING should not be allowed.
- transition(stateMachine, PENDING, RESTARTING);
+ @Test
+ public void testAssignedToKilling() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+ legalTransition(KILLING, Action.SAVE_STATE, Action.KILL);
}
@Test
- public void testAllowsSkipStartingAndRunning() {
- expectWork(UPDATE_STATE).times(3);
+ public void testAssignedToMissing() {
+ // No action is taken for ASSIGNED -> UNKNOWN since this usually means the slave has not yet
+ // received the task. We rely on task timeouts to clean up tasks stalled in ASSIGNED.
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED);
+ illegalTransition(UNKNOWN);
+ }
- control.replay();
+ @Test
+ public void testStartingToRestarting() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+ legalTransition(RESTARTING, Action.SAVE_STATE, Action.KILL);
+ }
- transition(stateMachine, PENDING, ASSIGNED, FINISHED);
+ @Test
+ public void testStartingToPreempting() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+ legalTransition(PREEMPTING, Action.SAVE_STATE, Action.KILL);
}
@Test
- public void testAllowsSkipRunning() {
- expectWork(UPDATE_STATE).times(4);
+ public void testStartingToKilled() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+ legalTransition(KILLED, Action.SAVE_STATE, Action.RESCHEDULE);
+ }
- control.replay();
+ @Test
+ public void testStartingToLost() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING);
+ legalTransition(LOST, Action.SAVE_STATE, Action.RESCHEDULE);
+ }
- transition(stateMachine, PENDING, ASSIGNED, STARTING, FINISHED);
+ @Test
+ public void testRunningToPreempting() {
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, RUNNING);
+ legalTransition(PREEMPTING, Action.SAVE_STATE, Action.KILL);
}
@Test
@@ -270,23 +310,15 @@ public class TaskStateMachineTest extends EasyMockTest {
ScheduledTask task = makeTask(false);
task.getAssignedTask().getTask().setMaxTaskFailures(10);
task.setFailureCount(8);
- stateMachine = makeStateMachine("test", task);
-
- expectWork(UPDATE_STATE).times(5);
- expectWork(RESCHEDULE);
- expectWork(INCREMENT_FAILURES);
+ stateMachine = makeStateMachine(task);
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(FAILED, Action.SAVE_STATE, Action.RESCHEDULE, Action.INCREMENT_FAILURES);
ScheduledTask rescheduled = task.deepCopy();
rescheduled.setFailureCount(9);
- TaskStateMachine rescheduledMachine = makeStateMachine("test2", rescheduled);
- expectWork(UPDATE_STATE, rescheduledMachine).times(5);
- expectWork(INCREMENT_FAILURES, rescheduledMachine);
-
- control.replay();
-
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
-
- transition(rescheduledMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+ stateMachine = makeStateMachine(rescheduled);
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(FAILED, Action.SAVE_STATE, Action.INCREMENT_FAILURES);
}
@Test
@@ -294,53 +326,306 @@ public class TaskStateMachineTest extends EasyMockTest {
ScheduledTask task = makeTask(false);
task.getAssignedTask().getTask().setMaxTaskFailures(-1);
task.setFailureCount(1000);
- stateMachine = makeStateMachine("test", task);
+ stateMachine = makeStateMachine(task);
- expectWork(UPDATE_STATE).times(5);
- expectWork(RESCHEDULE);
- expectWork(INCREMENT_FAILURES);
-
- control.replay();
-
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, FAILED);
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(FAILED, Action.SAVE_STATE, Action.RESCHEDULE, Action.INCREMENT_FAILURES);
}
@Test
public void testKillingRequest() {
- expectWork(UPDATE_STATE).times(6);
- expectWork(KILL);
+ expectUpdateStateOnTransitionTo(PENDING, ASSIGNED, STARTING, RUNNING);
+ legalTransition(KILLING, Action.KILL, Action.SAVE_STATE);
+ expectUpdateStateOnTransitionTo(KILLED);
+ }
- control.replay();
+ private static final Function<Action, SideEffect> TO_SIDE_EFFECT =
+ new Function<Action, SideEffect>() {
+ @Override public SideEffect apply(Action action) {
+ return new SideEffect(action, Optional.<ScheduleStatus>absent());
+ }
+ };
- transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, KILLING, KILLED);
+ private void legalTransition(ScheduleStatus state, SideEffect.Action... expectedActions) {
+ legalTransition(state, ImmutableSet.copyOf(expectedActions));
}
- private static void transition(TaskStateMachine stateMachine, ScheduleStatus... states) {
+ private void legalTransition(ScheduleStatus state, Set<SideEffect.Action> expectedActions) {
+ ScheduleStatus initialStatus = stateMachine.getState();
+ TransitionResult result = stateMachine.updateState(state);
+ assertTrue("Transition to " + state + " was not successful", result.isSuccess());
+ assertEquals(initialStatus, stateMachine.getPreviousState());
+ assertEquals(state, stateMachine.getState());
+ assertEquals(
+ FluentIterable.from(expectedActions).transform(TO_SIDE_EFFECT).toSet(),
+ result.getSideEffects());
+ }
+
+ private void expectUpdateStateOnTransitionTo(ScheduleStatus... states) {
for (ScheduleStatus status : states) {
- stateMachine.updateState(status);
+ legalTransition(status, Action.SAVE_STATE);
}
}
- private IExpectationSetters<Void> expectWork(WorkCommand work) {
- return expectWork(work, stateMachine);
+ private void illegalTransition(ScheduleStatus state, SideEffect.Action... expectedActions) {
+ illegalTransition(
+ state,
+ FluentIterable.from(
+ ImmutableSet.copyOf(expectedActions)).transform(TO_SIDE_EFFECT).toSet());
}
- private IExpectationSetters<Void> expectWork(WorkCommand work, TaskStateMachine machine) {
- workSink.addWork(
- eq(work),
- eq(machine),
- EasyMock.<Function<IScheduledTask, IScheduledTask>>anyObject());
- return expectLastCall();
+ private void illegalTransition(ScheduleStatus state, Set<SideEffect> sideEffects) {
+ ScheduleStatus initialStatus = stateMachine.getState();
+ TransitionResult result = stateMachine.updateState(state);
+ assertEquals(initialStatus, stateMachine.getState());
+ assertFalse(result.isSuccess());
+ assertEquals(sideEffects, result.getSideEffects());
}
private static ScheduledTask makeTask(boolean service) {
return new ScheduledTask()
+ .setStatus(INIT)
.setAssignedTask(
new AssignedTask()
+ .setTaskId("test")
.setTask(
new TaskConfig()
.setOwner(new Identity().setRole("roleA"))
.setJobName("jobA")
.setIsService(service)));
}
+
+ private static final TransitionResult LEGAL_NO_ACTION =
+ new TransitionResult(true, ImmutableSet.<SideEffect>of());
+ private static final TransitionResult SAVE = new TransitionResult(
+ true,
+ ImmutableSet.of(new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent())));
+ private static final TransitionResult SAVE_AND_KILL = new TransitionResult(
+ true,
+ ImmutableSet.of(
+ new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+ new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent())));
+ private static final TransitionResult SAVE_AND_RESCHEDULE = new TransitionResult(
+ true,
+ ImmutableSet.of(
+ new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+ new SideEffect(Action.RESCHEDULE, Optional.<ScheduleStatus>absent())));
+ private static final TransitionResult SAVE_KILL_AND_RESCHEDULE = new TransitionResult(
+ true,
+ ImmutableSet.of(
+ new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+ new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent()),
+ new SideEffect(Action.RESCHEDULE, Optional.<ScheduleStatus>absent())));
+ private static final TransitionResult ILLEGAL_KILL = new TransitionResult(
+ false,
+ ImmutableSet.of(new SideEffect(Action.KILL, Optional.<ScheduleStatus>absent())));
+ private static final TransitionResult RECORD_FAILURE = new TransitionResult(
+ true,
+ ImmutableSet.of(
+ new SideEffect(Action.SAVE_STATE, Optional.<ScheduleStatus>absent()),
+ new SideEffect(Action.INCREMENT_FAILURES, Optional.<ScheduleStatus>absent())));
+ private static final TransitionResult DELETE_TASK = new TransitionResult(
+ true,
+ ImmutableSet.of(new SideEffect(Action.DELETE, Optional.<ScheduleStatus>absent())));
+ private static final TransitionResult MARK_LOST = new TransitionResult(
+ false,
+ ImmutableSet.of(new SideEffect(Action.STATE_CHANGE, Optional.of(LOST))));
+
+ private static final class TestCase {
+ private final boolean taskPresent;
+ private final ScheduleStatus from;
+ private final ScheduleStatus to;
+
+ private TestCase(boolean taskPresent, ScheduleStatus from, ScheduleStatus to) {
+ this.taskPresent = taskPresent;
+ this.from = from;
+ this.to = to;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(taskPresent, from, to);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TestCase)) {
+ return false;
+ }
+
+ TestCase other = (TestCase) o;
+ return (taskPresent == other.taskPresent)
+ && (from == other.from)
+ && (to == other.to);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("taskPresent", taskPresent)
+ .add("from", from)
+ .add("to", to)
+ .toString();
+ }
+ }
+
+ private static final Map<TestCase, TransitionResult> EXPECTATIONS =
+ ImmutableMap.<TestCase, TransitionResult>builder()
+ .put(new TestCase(true, INIT, PENDING), SAVE)
+ .put(new TestCase(false, INIT, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, INIT, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, INIT, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, INIT, UNKNOWN), LEGAL_NO_ACTION)
+ .put(new TestCase(false, THROTTLED, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, THROTTLED, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, THROTTLED, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, PENDING, ASSIGNED), SAVE)
+ .put(new TestCase(false, PENDING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, PENDING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, PENDING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, PENDING, KILLING), DELETE_TASK)
+ .put(new TestCase(false, ASSIGNED, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(true, ASSIGNED, STARTING), SAVE)
+ .put(new TestCase(false, ASSIGNED, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, ASSIGNED, RUNNING), SAVE)
+ .put(new TestCase(false, ASSIGNED, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, ASSIGNED, FINISHED), SAVE)
+ .put(new TestCase(true, ASSIGNED, PREEMPTING), SAVE_AND_KILL)
+ .put(new TestCase(true, ASSIGNED, RESTARTING), SAVE_AND_KILL)
+ .put(new TestCase(true, ASSIGNED, FAILED), RECORD_FAILURE)
+ .put(new TestCase(true, ASSIGNED, KILLED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, ASSIGNED, KILLING), SAVE_AND_KILL)
+ .put(new TestCase(true, ASSIGNED, LOST), SAVE_KILL_AND_RESCHEDULE)
+ .put(new TestCase(false, STARTING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, STARTING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, STARTING, RUNNING), SAVE)
+ .put(new TestCase(false, STARTING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, STARTING, FINISHED), SAVE)
+ .put(new TestCase(true, STARTING, PREEMPTING), SAVE_AND_KILL)
+ .put(new TestCase(true, STARTING, RESTARTING), SAVE_AND_KILL)
+ .put(new TestCase(true, STARTING, FAILED), RECORD_FAILURE)
+ .put(new TestCase(true, STARTING, KILLED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, STARTING, KILLING), SAVE_AND_KILL)
+ .put(new TestCase(true, STARTING, LOST), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, STARTING, UNKNOWN), MARK_LOST)
+ .put(new TestCase(false, RUNNING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, RUNNING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, RUNNING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, RUNNING, FINISHED), SAVE)
+ .put(new TestCase(true, RUNNING, PREEMPTING), SAVE_AND_KILL)
+ .put(new TestCase(true, RUNNING, RESTARTING), SAVE_AND_KILL)
+ .put(new TestCase(true, RUNNING, FAILED), RECORD_FAILURE)
+ .put(new TestCase(true, RUNNING, KILLED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, RUNNING, KILLING), SAVE_AND_KILL)
+ .put(new TestCase(true, RUNNING, LOST), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, RUNNING, UNKNOWN), MARK_LOST)
+ .put(new TestCase(true, FINISHED, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, FINISHED, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(true, FINISHED, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, FINISHED, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, FINISHED, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(false, FINISHED, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, FINISHED, UNKNOWN), DELETE_TASK)
+ .put(new TestCase(true, PREEMPTING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, PREEMPTING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(true, PREEMPTING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, PREEMPTING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, PREEMPTING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(false, PREEMPTING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, PREEMPTING, FINISHED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, PREEMPTING, FAILED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, PREEMPTING, KILLED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, PREEMPTING, KILLING), SAVE)
+ .put(new TestCase(true, PREEMPTING, LOST), SAVE_KILL_AND_RESCHEDULE)
+ .put(new TestCase(true, PREEMPTING, UNKNOWN), MARK_LOST)
+ .put(new TestCase(true, RESTARTING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, RESTARTING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(true, RESTARTING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, RESTARTING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, RESTARTING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(false, RESTARTING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, RESTARTING, FINISHED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, RESTARTING, FAILED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, RESTARTING, KILLED), SAVE_AND_RESCHEDULE)
+ .put(new TestCase(true, RESTARTING, KILLING), SAVE)
+ .put(new TestCase(true, RESTARTING, LOST), SAVE_KILL_AND_RESCHEDULE)
+ .put(new TestCase(true, RESTARTING, UNKNOWN), MARK_LOST)
+ .put(new TestCase(true, FAILED, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, FAILED, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(true, FAILED, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, FAILED, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, FAILED, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(false, FAILED, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, FAILED, UNKNOWN), DELETE_TASK)
+ .put(new TestCase(true, KILLED, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, KILLED, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(true, KILLED, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, KILLED, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, KILLED, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(false, KILLED, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, KILLED, UNKNOWN), DELETE_TASK)
+ .put(new TestCase(true, KILLING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, KILLING, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(true, KILLING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, KILLING, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, KILLING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(false, KILLING, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, KILLING, FINISHED), SAVE)
+ .put(new TestCase(true, KILLING, FAILED), SAVE)
+ .put(new TestCase(true, KILLING, KILLED), SAVE)
+ .put(new TestCase(true, KILLING, LOST), SAVE)
+ .put(new TestCase(true, KILLING, UNKNOWN), DELETE_TASK)
+ .put(new TestCase(true, LOST, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, LOST, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(true, LOST, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, LOST, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(true, LOST, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(false, LOST, RUNNING), ILLEGAL_KILL)
+ .put(new TestCase(true, LOST, UNKNOWN), DELETE_TASK)
+ .put(new TestCase(false, UNKNOWN, ASSIGNED), ILLEGAL_KILL)
+ .put(new TestCase(false, UNKNOWN, STARTING), ILLEGAL_KILL)
+ .put(new TestCase(false, UNKNOWN, RUNNING), ILLEGAL_KILL)
+ .build();
+
+ @Test
+ public void exhaustivelyTestTransitions() {
+ for (ScheduleStatus from : ScheduleStatus.values()) {
+ for (ScheduleStatus to : ScheduleStatus.values()) {
+ for (Boolean taskPresent : ImmutableList.of(Boolean.TRUE, Boolean.FALSE)) {
+ TestCase testCase = new TestCase(taskPresent, from, to);
+
+ TransitionResult expectation = EXPECTATIONS.get(testCase);
+ if (expectation == null) {
+ expectation = new TransitionResult(false, ImmutableSet.<SideEffect>of());
+ }
+
+ TaskStateMachine machine;
+ if (taskPresent) {
+ // Cannot create a state machine for an UNKNOWN task that is in the store.
+ boolean expectException = from == UNKNOWN;
+ try {
+ machine =
+ new TaskStateMachine(IScheduledTask.build(makeTask(false).setStatus(from)));
+ if (expectException) {
+ fail();
+ }
+ } catch (IllegalStateException e) {
+ if (!expectException) {
+ throw e;
+ } else {
+ continue;
+ }
+ }
+ } else {
+ machine = new TaskStateMachine("name");
+ }
+
+ assertEquals(
+ "Unexpected behavor for " + testCase,
+ expectation,
+ machine.updateState(to));
+ }
+ }
+ }
+ }
}
[3/3] git commit: Refactor StateManagerImpl and TaskStateMachine for
less code and better readability.
Posted by wf...@apache.org.
Refactor StateManagerImpl and TaskStateMachine for less code and better readability.
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/70adc19a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/70adc19a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/70adc19a
Branch: refs/heads/wfarner/state_machine_refactor
Commit: 70adc19ad99be08de93fd2b501b7f8bab83b0953
Parents: 801dcfb
Author: Bill Farner <bi...@twitter.com>
Authored: Thu Jan 9 16:07:56 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Jan 13 12:26:17 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/state/SideEffect.java | 94 +++
.../scheduler/state/SideEffectStorage.java | 169 ------
.../scheduler/state/StateManagerImpl.java | 500 +++++++--------
.../scheduler/state/TaskStateMachine.java | 546 +++++++----------
.../aurora/scheduler/state/WorkCommand.java | 33 -
.../aurora/scheduler/storage/Storage.java | 4 +
.../aurora/scheduler/storage/TaskStore.java | 2 +
.../scheduler/state/StateManagerImplTest.java | 95 ++-
.../scheduler/state/TaskStateMachineTest.java | 601 ++++++++++++++-----
9 files changed, 1116 insertions(+), 928 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
new file mode 100644
index 0000000..5759691
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+/**
+ * Descriptions of the different types of external work commands that task state machines may
+ * trigger.
+ */
+class SideEffect {
+ private final Action action;
+ private final Optional<ScheduleStatus> nextState;
+
+ SideEffect(Action action, Optional<ScheduleStatus> nextState) {
+ this.action = action;
+ if (action == Action.STATE_CHANGE) {
+ Preconditions.checkArgument(
+ nextState.isPresent(),
+ "A next state must be provided for a state change action.");
+ }
+ this.nextState = nextState;
+ }
+
+ public Action getAction() {
+ return action;
+ }
+
+ public Optional<ScheduleStatus> getNextState() {
+ return nextState;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SideEffect)) {
+ return false;
+ }
+
+ SideEffect other = (SideEffect) o;
+ return Objects.equal(action, other.action)
+ && Objects.equal(nextState, other.nextState);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(action, nextState);
+ }
+
+ @Override
+ public String toString() {
+ if (nextState.isPresent()) {
+ return action.toString() + " " + nextState.get();
+ } else {
+ return action.toString();
+ }
+ }
+
+ enum Action {
+ // Send an instruction for the runner of this task to kill the task.
+ KILL,
+
+ // Create a new state machine with a copy of this task.
+ RESCHEDULE,
+
+ // Update the task's state (schedule status) in the persistent store to match the state machine.
+ SAVE_STATE,
+
+ // Delete this task from the persistent store.
+ DELETE,
+
+ // Increment the failure count for this task.
+ INCREMENT_FAILURES,
+
+ // Perform an additional state change on the task.
+ STATE_CHANGE
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
deleted file mode 100644
index 2bdd459..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around the persistent storage and mutable state.
- */
-class SideEffectStorage {
-
- private final Queue<PubsubEvent> events = Lists.newLinkedList();
- @VisibleForTesting
- Queue<PubsubEvent> getEvents() {
- return events;
- }
-
- private AtomicBoolean inOperation = new AtomicBoolean(false);
-
- private final Storage storage;
- private final OperationFinalizer operationFinalizer;
- private final EventSink eventSink;
-
- interface OperationFinalizer {
- /**
- * Performs any work necessary to complete the operation.
- * This is executed in the context of a write operation, immediately after the work
- * executes normally.
- * NOTE: At present, this is executed for every nesting level of operations, rather than
- * at the completion of the top-level operation.
- * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
- * for more detail.
- *
- * @param work Work to finalize.
- * @param storeProvider Mutable store reference.
- */
- void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
- }
-
- SideEffectStorage(
- Storage storage,
- OperationFinalizer operationFinalizer,
- EventSink eventSink) {
-
- this.storage = checkNotNull(storage);
- this.operationFinalizer = checkNotNull(operationFinalizer);
- this.eventSink = checkNotNull(eventSink);
- }
-
- /**
- * Perform a unit of work in a mutating operation. This supports nesting/reentrancy.
- *
- * @param work Work to perform.
- * @param <T> Work return type
- * @param <E> Work exception type.
- * @return The work return value.
- * @throws E The work exception.
- */
- <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
- return storage.write(executeSideEffectsAfter(work));
- }
-
- <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- return storage.consistentRead(work);
- }
-
- /**
- * Work that has side effects external to the storage system.
- * Work may add side effect and pubsub events, which will be executed/sent upon normal
- * completion of the operation.
- *
- * @param <T> Work return type.
- * @param <E> Work exception type.
- */
- abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
- protected final void addTaskEvent(PubsubEvent notice) {
- Preconditions.checkState(inOperation.get());
- events.add(Preconditions.checkNotNull(notice));
- }
- }
-
- /**
- * Work with side effects which does not throw checked exceptions.
- *
- * @param <T> Work return type.
- */
- abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
- }
-
- /**
- * Work with side effects that does not have a return value.
- *
- * @param <E> Work exception type.
- */
- abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
-
- @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
- execute(storeProvider);
- return null;
- }
-
- abstract void execute(MutableStoreProvider storeProvider) throws E;
- }
-
- /**
- * Work with side effects which does not throw checked exceptions or have a return
- * value.
- */
- abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
- }
-
- private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
- final SideEffectWork<T, E> work) {
-
- return new MutateWork<T, E>() {
- @Override public T apply(MutableStoreProvider storeProvider) throws E {
- boolean topLevelOperation = inOperation.compareAndSet(false, true);
-
- try {
- T result = work.apply(storeProvider);
-
- // TODO(William Farner): Maintaining this since it matches prior behavior, but this
- // seems wrong. Double-check whether this is necessary, or if only the top-level
- // operation should be executing the finalizer. Update doc on OperationFinalizer
- // once this is assessed.
- operationFinalizer.finalize(work, storeProvider);
- if (topLevelOperation) {
- while (!events.isEmpty()) {
- eventSink.post(events.remove());
- }
- }
- return result;
- } finally {
- if (topLevelOperation) {
- inOperation.set(false);
- }
- }
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6078eee..819d921 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -15,42 +15,48 @@
*/
package org.apache.aurora.scheduler.state;
-import java.util.Comparator;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.annotation.Nullable;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.stats.Stats;
import com.twitter.common.util.Clock;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
+import org.apache.aurora.scheduler.state.TaskStateMachine.TransitionResult;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -62,11 +68,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.transform;
import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
-
/**
* Manager of all persistence-related operations for the scheduler. Acts as a controller for
@@ -78,38 +82,11 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
public class StateManagerImpl implements StateManager {
private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
- private final SideEffectStorage storage;
- @VisibleForTesting
- SideEffectStorage getStorage() {
- return storage;
- }
-
+ private final Storage storage;
+ private final Clock clock;
+ private final Driver driver;
private final TaskIdGenerator taskIdGenerator;
-
- // Work queue to receive state machine side effect work.
- // Items are sorted to place DELETE entries last. This is to ensure that within an operation,
- // a delete is always processed after a state transition.
- private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
- new Comparator<WorkEntry>() {
- @Override public int compare(WorkEntry a, WorkEntry b) {
- if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
- return (a.command == WorkCommand.DELETE) ? 1 : -1;
- } else {
- return 0;
- }
- }
- });
-
- // Adapt the work queue into a sink.
- private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
- @Override public void addWork(
- WorkCommand work,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- workQueue.add(new WorkEntry(work, stateMachine, mutation));
- }
- };
+ private final EventSink eventSink;
private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
@@ -125,28 +102,6 @@ public class StateManagerImpl implements StateManager {
}
};
- private final Driver driver;
- private final Clock clock;
-
- /**
- * An item of work on the work queue.
- */
- private static class WorkEntry {
- private final WorkCommand command;
- private final TaskStateMachine stateMachine;
- private final Function<IScheduledTask, IScheduledTask> mutation;
-
- WorkEntry(
- WorkCommand command,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- this.command = command;
- this.stateMachine = stateMachine;
- this.mutation = mutation;
- }
- }
-
@Inject
StateManagerImpl(
final Storage storage,
@@ -155,20 +110,11 @@ public class StateManagerImpl implements StateManager {
TaskIdGenerator taskIdGenerator,
EventSink eventSink) {
- checkNotNull(storage);
+ this.storage = checkNotNull(storage);
this.clock = checkNotNull(clock);
-
- OperationFinalizer finalizer = new OperationFinalizer() {
- @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
- processWorkQueueInWriteOperation(work, store);
- }
- };
-
- this.storage = new SideEffectStorage(storage, finalizer, eventSink);
this.driver = checkNotNull(driver);
this.taskIdGenerator = checkNotNull(taskIdGenerator);
-
- Stats.exportSize("work_queue_depth", workQueue);
+ this.eventSink = checkNotNull(eventSink);
}
@Override
@@ -179,12 +125,16 @@ public class StateManagerImpl implements StateManager {
final Set<IScheduledTask> scheduledTasks =
ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
- storage.write(storage.new NoResultQuietSideEffectWork() {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override protected void execute(MutableStoreProvider storeProvider) {
storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
for (IScheduledTask task : scheduledTasks) {
- createStateMachine(task).updateState(PENDING);
+ updateTaskAndExternalState(
+ Tasks.id(task),
+ Optional.of(task),
+ PENDING,
+ Optional.<String>absent());
}
}
});
@@ -197,54 +147,55 @@ public class StateManagerImpl implements StateManager {
final ScheduleStatus newState,
final Optional<String> auditMessage) {
- return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
- @Override
- public Boolean apply(TaskStateMachine stateMachine) {
- return stateMachine.updateState(newState, auditMessage);
- }
- });
+ return updateTaskAndExternalState(casState, taskId, newState, auditMessage);
}
@Override
public IAssignedTask assignTask(
- String taskId,
- String slaveHost,
- SlaveID slaveId,
- Set<Integer> assignedPorts) {
+ final String taskId,
+ final String slaveHost,
+ final SlaveID slaveId,
+ final Set<Integer> assignedPorts) {
checkNotBlank(taskId);
checkNotBlank(slaveHost);
+ checkNotNull(slaveId);
checkNotNull(assignedPorts);
- TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
- changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
-
- return mutation.getAssignedTask();
- }
-
- private boolean changeState(
- final String taskId,
- final Optional<ScheduleStatus> casState,
- final Function<TaskStateMachine, Boolean> stateChange) {
-
- return storage.write(storage.new QuietSideEffectWork<Boolean>() {
- @Override public Boolean apply(MutableStoreProvider storeProvider) {
- IScheduledTask task = Iterables.getOnlyElement(
- storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
- null);
- if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
- return false;
- }
+ return storage.write(new MutateWork.Quiet<IAssignedTask>() {
+ @Override public IAssignedTask apply(MutableStoreProvider storeProvider) {
+ boolean success = updateTaskAndExternalState(
+ Optional.<ScheduleStatus>absent(),
+ taskId,
+ ASSIGNED,
+ Optional.<String>absent());
+
+ Preconditions.checkState(
+ success,
+ "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
+ Query.Builder query = Query.taskScoped(taskId);
+ storeProvider.getUnsafeTaskStore().mutateTasks(query,
+ new Function<IScheduledTask, IScheduledTask>() {
+ @Override
+ public IScheduledTask apply(IScheduledTask task) {
+ ScheduledTask builder = task.newBuilder();
+ AssignedTask assigned = builder.getAssignedTask();
+ assigned.setAssignedPorts(
+ getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+ assigned.setSlaveHost(slaveHost)
+ .setSlaveId(slaveId.getValue());
+ return IScheduledTask.build(builder);
+ }
+ });
- return stateChange.apply(getStateMachine(taskId, task));
+ return Iterables.getOnlyElement(
+ Iterables.transform(
+ storeProvider.getTaskStore().fetchTasks(query),
+ Tasks.SCHEDULED_TO_ASSIGNED));
}
});
}
- private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
- IAssignedTask getAssignedTask();
- }
-
private static Map<String, Integer> getNameMappedPorts(
Set<String> portNames,
Set<Integer> allocatedPorts) {
@@ -270,157 +221,218 @@ public class StateManagerImpl implements StateManager {
return ports;
}
- private TaskAssignMutation assignHost(
- final String slaveHost,
- final SlaveID slaveId,
- final Set<Integer> assignedPorts) {
+ @VisibleForTesting
+ static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
+ new Supplier<String>() {
+ @Override public String get() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.log(Level.SEVERE, "Failed to get self hostname.");
+ throw Throwables.propagate(e);
+ }
+ }
+ });
- final TaskMutation mutation = new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- ScheduledTask builder = task.newBuilder();
- AssignedTask assigned = builder.getAssignedTask();
- assigned.setAssignedPorts(
- getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
- assigned.setSlaveHost(slaveHost)
- .setSlaveId(slaveId.getValue());
- return IScheduledTask.build(builder);
- }
- };
+ private boolean updateTaskAndExternalState(
+ final Optional<ScheduleStatus> casState,
+ final String taskId,
+ final ScheduleStatus targetState,
+ final Optional<String> transitionMessage) {
- return new TaskAssignMutation() {
- private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
- @Override public IAssignedTask getAssignedTask() {
- return assignedTask.get();
- }
+ return storage.write(new MutateWork.Quiet<Boolean>() {
+ @Override public Boolean apply(MutableStoreProvider storeProvider) {
+ Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
+ storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+ null));
- @Override public Boolean apply(final TaskStateMachine stateMachine) {
- TaskMutation wrapper = new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- IScheduledTask mutated = mutation.apply(task);
- Preconditions.checkState(
- assignedTask.compareAndSet(null, mutated.getAssignedTask()),
- "More than one result was found for an identity query.");
- return mutated;
- }
- };
- return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
+ // CAS operation fails if the task does not exist, or the states don't match.
+ if (casState.isPresent()
+ && (!task.isPresent() || (casState.get() != task.get().getStatus()))) {
+
+ return false;
+ }
+
+ return updateTaskAndExternalState(taskId, task, targetState, transitionMessage);
}
- };
+ });
}
- private void processWorkQueueInWriteOperation(
- SideEffectWork<?, ?> sideEffectWork,
- MutableStoreProvider storeProvider) {
-
- for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
- final TaskStateMachine stateMachine = work.stateMachine;
-
- if (work.command == WorkCommand.KILL) {
- driver.killTask(stateMachine.getTaskId());
- } else {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- String taskId = stateMachine.getTaskId();
- Query.Builder idQuery = Query.taskScoped(taskId);
-
- switch (work.command) {
- case RESCHEDULE:
- ScheduledTask builder =
- Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
- builder.getAssignedTask().unsetSlaveId();
- builder.getAssignedTask().unsetSlaveHost();
- builder.getAssignedTask().unsetAssignedPorts();
- builder.unsetTaskEvents();
- builder.setAncestorId(taskId);
- String newTaskId = taskIdGenerator.generate(
- ITaskConfig.build(builder.getAssignedTask().getTask()),
- builder.getAssignedTask().getInstanceId());
- builder.getAssignedTask().setTaskId(newTaskId);
-
- LOG.info("Task being rescheduled: " + taskId);
-
- IScheduledTask task = IScheduledTask.build(builder);
- taskStore.saveTasks(ImmutableSet.of(task));
-
- createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
- ITaskConfig taskInfo = task.getAssignedTask().getTask();
- sideEffectWork.addTaskEvent(
- new PubsubEvent.TaskRescheduled(
- taskInfo.getOwner().getRole(),
- taskInfo.getJobName(),
- task.getAssignedTask().getInstanceId()));
- break;
-
- case UPDATE_STATE:
- taskStore.mutateTasks(idQuery, new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- return work.mutation.apply(
- IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
- }
- });
- sideEffectWork.addTaskEvent(
- PubsubEvent.TaskStateChange.transition(
- Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
- stateMachine.getPreviousState()));
- break;
-
- case DELETE:
- deleteTasks(ImmutableSet.of(taskId));
- break;
-
- case INCREMENT_FAILURES:
- taskStore.mutateTasks(idQuery, new TaskMutation() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- return IScheduledTask.build(
- task.newBuilder().setFailureCount(task.getFailureCount() + 1));
- }
- });
- break;
+ private static final Function<SideEffect, SideEffect.Action> GET_ACTION =
+ new Function<SideEffect, Action>() {
+ @Override public Action apply(SideEffect sideEffect) {
+ return sideEffect.getAction();
+ }
+ };
+
+ private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
+ Action.INCREMENT_FAILURES,
+ Action.SAVE_STATE,
+ Action.STATE_CHANGE,
+ Action.RESCHEDULE,
+ Action.KILL,
+ Action.DELETE);
+ static {
+ // Sanity check to ensure no actions are missing.
+ Preconditions.checkState(
+ ImmutableSet.copyOf(ACTIONS_IN_ORDER).equals(ImmutableSet.copyOf(Action.values())),
+ "Not all actions are included in ordering.");
+ }
+
+ // Actions are deliberately ordered to prevent things like deleting a task before rescheduling it
+ // (thus losing the object to copy), or rescheduling a task before incrementing the failure count
+ // (thus not carrying forward the failure increment).
+ private static final Ordering<SideEffect> ACTION_ORDER =
+ Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
- default:
- LOG.severe("Unrecognized work command type " + work.command);
+ private boolean updateTaskAndExternalState(
+ final String taskId,
+ // Note: This argument is deliberately non-final, and should not be made final.
+ // This is because using the captured value within the storage operation below is
+ // highly-risky, since it doesn't necessarily represent the value in storage.
+ // As a result, it would be easy to accidentally clobber mutations.
+ Optional<IScheduledTask> task,
+ final ScheduleStatus targetState,
+ final Optional<String> transitionMessage) {
+
+ if (task.isPresent()) {
+ Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId()));
+ }
+
+ final List<PubsubEvent> events = Lists.newArrayList();
+
+ final TaskStateMachine stateMachine = task.isPresent()
+ ? new TaskStateMachine(task.get())
+ : new TaskStateMachine(taskId);
+
+ boolean success = storage.write(new MutateWork.Quiet<Boolean>() {
+ @Override public Boolean apply(MutableStoreProvider storeProvider) {
+ TransitionResult result = stateMachine.updateState(targetState);
+ Query.Builder query = Query.taskScoped(taskId);
+
+ for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
+ Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
+ Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query), null));
+
+ switch (sideEffect.getAction()) {
+ case KILL:
+ driver.killTask(taskId);
+ break;
+
+ case RESCHEDULE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+ LOG.info("Task being rescheduled: " + taskId);
+
+ ScheduledTask builder = upToDateTask.get().newBuilder();
+ builder.setStatus(INIT);
+ builder.getAssignedTask().unsetSlaveId();
+ builder.getAssignedTask().unsetSlaveHost();
+ builder.getAssignedTask().unsetAssignedPorts();
+ builder.unsetTaskEvents();
+ builder.setAncestorId(taskId);
+ String newTaskId = taskIdGenerator.generate(
+ ITaskConfig.build(builder.getAssignedTask().getTask()),
+ builder.getAssignedTask().getInstanceId());
+ builder.getAssignedTask().setTaskId(newTaskId);
+
+ IScheduledTask newTask = IScheduledTask.build(builder);
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(newTask));
+ updateTaskAndExternalState(
+ newTaskId,
+ Optional.of(newTask),
+ PENDING, Optional.of("Rescheduled"));
+
+ ITaskConfig taskInfo = newTask.getAssignedTask().getTask();
+ events.add(
+ new PubsubEvent.TaskRescheduled(
+ taskInfo.getOwner().getRole(),
+ taskInfo.getJobName(),
+ newTask.getAssignedTask().getInstanceId()));
+ break;
+
+ case SAVE_STATE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+
+ storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ ScheduledTask mutableTask = task.newBuilder();
+ mutableTask.setStatus(stateMachine.getState());
+ mutableTask.addToTaskEvents(new TaskEvent()
+ .setTimestamp(clock.nowMillis())
+ .setStatus(targetState)
+ .setMessage(transitionMessage.orNull())
+ .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+ return IScheduledTask.build(mutableTask);
+ }
+ });
+ events.add(
+ PubsubEvent.TaskStateChange.transition(
+ Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query)),
+ stateMachine.getPreviousState()));
+ break;
+
+ case DELETE:
+ Preconditions.checkState(
+ upToDateTask.isPresent(),
+ "Operation expected task " + taskId + " to be present.");
+
+ events.add(deleteTasks(storeProvider, ImmutableSet.of(taskId)));
+ break;
+
+ case INCREMENT_FAILURES:
+ storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ return IScheduledTask.build(
+ task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+ }
+ });
+ break;
+
+ case STATE_CHANGE:
+ updateTaskAndExternalState(
+ Optional.<ScheduleStatus>absent(),
+ taskId,
+ sideEffect.getNextState().get(),
+ Optional.<String>absent());
+ break;
+
+ default:
+ throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
+ }
}
+
+ return result.isSuccess();
}
+ });
+
+ // Note: Delaying events until after the write operation is somewhat futile, since the state
+ // may actually not be written to durable store (e.g. if this is a nested transaction).
+ // Ideally, Storage would add a facility to attach side-effects that are performed after the
+ // outer-most transaction completes (meaning state has been durably persisted).
+ for (PubsubEvent event : events) {
+ eventSink.post(event);
}
+
+ return success;
}
@Override
public void deleteTasks(final Set<String> taskIds) {
- storage.write(storage.new NoResultQuietSideEffectWork() {
+ storage.write(new MutateWork.NoResult.Quiet() {
@Override protected void execute(final MutableStoreProvider storeProvider) {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
- addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
- taskStore.deleteTasks(taskIds);
+ eventSink.post(deleteTasks(storeProvider, taskIds));
}
});
}
- private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
- if (task != null) {
- return createStateMachine(task, task.getStatus());
- }
-
- // The task is unknown, not present in storage.
- TaskStateMachine stateMachine = new TaskStateMachine(
- taskId,
- null,
- workSink,
- clock,
- INIT);
- stateMachine.updateState(UNKNOWN);
- return stateMachine;
- }
-
- private TaskStateMachine createStateMachine(IScheduledTask task) {
- return createStateMachine(task, INIT);
- }
-
- private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
- return new TaskStateMachine(
- Tasks.id(task),
- task,
- workSink,
- clock,
- initialState);
+ private static PubsubEvent deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds) {
+ TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+ Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
+ taskStore.deleteTasks(taskIds);
+ return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index d0f88e5..cd0899c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -15,39 +15,54 @@
*/
package org.apache.aurora.scheduler.state;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
+import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import com.twitter.common.base.Closure;
import com.twitter.common.base.Closures;
import com.twitter.common.base.Command;
import com.twitter.common.base.MorePreconditions;
import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
import com.twitter.common.util.StateMachine;
import com.twitter.common.util.StateMachine.Rule;
import com.twitter.common.util.StateMachine.Transition;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.commons.lang.builder.HashCodeBuilder;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
+import static org.apache.aurora.scheduler.state.SideEffect.Action;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.DELETE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAILURES;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.STATE_CHANGE;
+
/**
* State machine for a task.
* <p>
@@ -55,8 +70,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
* to different state transitions. These responses are externally communicated by populating a
* provided work queue.
* <p>
- * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
- * abstractly from the consumption side.
+ * TODO(wfarner): Augment this class to force the one-time-use nature. This is probably best done
+ * by hiding the constructor and exposing only a static function to transition a task and get the
+ * resulting actions.
*/
class TaskStateMachine {
private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
@@ -64,183 +80,94 @@ class TaskStateMachine {
private static final AtomicLong ILLEGAL_TRANSITIONS =
Stats.exportLong("scheduler_illegal_task_state_transitions");
- // Re-declarations of statuses as wrapped state objects.
- private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
- private static final State FAILED = State.create(ScheduleStatus.FAILED);
- private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
- private static final State INIT = State.create(ScheduleStatus.INIT);
- private static final State KILLED = State.create(ScheduleStatus.KILLED);
- private static final State KILLING = State.create(ScheduleStatus.KILLING);
- private static final State LOST = State.create(ScheduleStatus.LOST);
- private static final State PENDING = State.create(ScheduleStatus.PENDING);
- private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
- private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
- private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
- private static final State STARTING = State.create(ScheduleStatus.STARTING);
- private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
-
- @VisibleForTesting
- static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
- new Supplier<String>() {
- @Override public String get() {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- LOG.log(Level.SEVERE, "Failed to get self hostname.");
- throw Throwables.propagate(e);
- }
- }
- });
-
- private final String taskId;
- private final WorkSink workSink;
- private final StateMachine<State> stateMachine;
+ private final StateMachine<ScheduleStatus> stateMachine;
private ScheduleStatus previousState = null;
- private final Clock clock;
-
- /**
- * Composes a schedule status and a state change argument. Only the ScheduleStatuses in two
- * States must be equal for them to be considered equal.
- */
- private static class State {
- private final ScheduleStatus state;
- private final Function<IScheduledTask, IScheduledTask> mutation;
-
- State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
- this.state = state;
- this.mutation = mutation;
- }
-
- static State create(ScheduleStatus status) {
- return create(status, Functions.<IScheduledTask>identity());
- }
-
- static State create(
- ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation) {
-
- return new State(status, mutation);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof State)) {
- return false;
- }
-
- if (o == this) {
- return true;
- }
-
- State other = (State) o;
- return state == other.state;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(state)
- .toHashCode();
- }
- @Override
- public String toString() {
- return state.toString();
- }
-
- private ScheduleStatus getState() {
- return state;
- }
-
- private Function<IScheduledTask, IScheduledTask> getMutation() {
- return mutation;
- }
- }
+ private final Set<SideEffect> sideEffects = Sets.newHashSet();
/**
- * A write-only work acceptor.
+ * Creates a new task state machine representing a non-existent task. This allows for consistent
+ * state-reconciliation actions when the external system disagrees with the scheduler.
+ *
+ * @param name Name of the state machine, for logging.
*/
- public interface WorkSink {
- /**
- * Appends external work that must be performed for a state machine transition to be fully
- * complete.
- *
- * @param work Description of the work to be performed.
- * @param stateMachine The state machine that the work is associated with.
- * @param mutation Mutate operation to perform along with the state transition.
- */
- void addWork(
- WorkCommand work,
- TaskStateMachine stateMachine,
- Function<IScheduledTask, IScheduledTask> mutation);
+ public TaskStateMachine(String name) {
+ this(name, Optional.<IScheduledTask>absent());
}
/**
- * Creates a new task state machine.
- *
- * @param taskId ID of the task managed by this state machine.
+ * Creates a new task state machine representing an existent task. The state machine will be
+ * named with the tasks ID.
+ *.
* @param task Read-only task that this state machine manages.
- * @param workSink Work sink to receive transition response actions
- * @param clock Clock to use for reading the current time.
- * @param initialState The state to begin the state machine at. All legal transitions will be
- * added, but this allows the state machine to 'skip' states, for instance when a task is
- * loaded from a persistent store.
*/
- public TaskStateMachine(
- final String taskId,
- final IScheduledTask task,
- final WorkSink workSink,
- final Clock clock,
- final ScheduleStatus initialState) {
-
- this.taskId = MorePreconditions.checkNotBlank(taskId);
- this.workSink = checkNotNull(workSink);
- this.clock = checkNotNull(clock);
- checkNotNull(initialState);
-
- @SuppressWarnings("unchecked")
- Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
- /* Kill a task that we believe to be terminated when an attempt is made to revive. */
- Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
- addWorkClosure(WorkCommand.KILL)),
- /* Remove a terminated task that is remotely removed. */
- Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
-
- final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case ASSIGNED:
- case STARTING:
- case RUNNING:
- addWork(WorkCommand.KILL);
- break;
-
- case LOST:
- addWork(WorkCommand.KILL);
- // fall through
-
- case FINISHED:
- case FAILED:
- case KILLED:
- addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
- break;
-
- case UNKNOWN:
- updateState(ScheduleStatus.LOST);
- break;
-
- default:
- // No-op.
- }
- }
- };
+ public TaskStateMachine(IScheduledTask task) {
+ this(Tasks.id(task), Optional.of(task));
+ }
+
+ private TaskStateMachine(final String name, final Optional<IScheduledTask> task) {
+ MorePreconditions.checkNotBlank(name);
+ checkNotNull(task);
+
+ final ScheduleStatus initialState = task.transform(Tasks.GET_STATUS).or(UNKNOWN);
+ Preconditions.checkState((initialState == UNKNOWN) == !task.isPresent());
+
+ Closure<Transition<ScheduleStatus>> manageTerminatedTasks = Closures.combine(
+ ImmutableList.<Closure<Transition<ScheduleStatus>>>builder()
+ // Kill a task that we believe to be terminated when an attempt is made to revive.
+ .add(Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING), addWorkClosure(KILL)))
+ // Remove a terminated task that is remotely removed.
+ .add(Closures.filter(Transition.to(UNKNOWN), addWorkClosure(DELETE)))
+ .build());
+
+ final Closure<Transition<ScheduleStatus>> manageRestartingTask =
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
+ case ASSIGNED:
+ addFollowup(KILL);
+ break;
+
+ case STARTING:
+ addFollowup(KILL);
+ break;
+
+ case RUNNING:
+ addFollowup(KILL);
+ break;
+
+ case LOST:
+ addFollowup(KILL);
+ addFollowup(RESCHEDULE);
+ break;
+
+ case FINISHED:
+ addFollowup(RESCHEDULE);
+ break;
+
+ case FAILED:
+ addFollowup(RESCHEDULE);
+ break;
+
+ case KILLED:
+ addFollowup(RESCHEDULE);
+ break;
+
+ case UNKNOWN:
+ addFollowupTransition(LOST);
+ break;
+
+ default:
+ // No-op.
+ }
+ }
+ };
// To be called on a task transitioning into the FINISHED state.
final Command rescheduleIfService = new Command() {
@Override public void execute() {
- if (task.getAssignedTask().getTask().isIsService()) {
- addWork(WorkCommand.RESCHEDULE);
+ if (task.get().getAssignedTask().getTask().isIsService()) {
+ addFollowup(RESCHEDULE);
}
}
};
@@ -248,24 +175,26 @@ class TaskStateMachine {
// To be called on a task transitioning into the FAILED state.
final Command incrementFailuresMaybeReschedule = new Command() {
@Override public void execute() {
- addWork(WorkCommand.INCREMENT_FAILURES);
+ addFollowup(INCREMENT_FAILURES);
// Max failures is ignored for service task.
- boolean isService = task.getAssignedTask().getTask().isIsService();
+ boolean isService = task.get().getAssignedTask().getTask().isIsService();
// Max failures is ignored when set to -1.
- int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
- if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
- addWork(WorkCommand.RESCHEDULE);
+ int maxFailures = task.get().getAssignedTask().getTask().getMaxTaskFailures();
+ boolean belowMaxFailures =
+ (maxFailures == -1) || (task.get().getFailureCount() < (maxFailures - 1));
+ if (isService || belowMaxFailures) {
+ addFollowup(RESCHEDULE);
} else {
- LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
+ LOG.info("Task " + name + " reached failure limit, not rescheduling");
}
}
};
- stateMachine = StateMachine.<State>builder(taskId)
+ stateMachine = StateMachine.<ScheduleStatus>builder(name)
.logTransitions()
- .initialState(State.create(initialState))
+ .initialState(initialState)
.addState(
Rule.from(INIT)
.to(PENDING, UNKNOWN))
@@ -273,11 +202,11 @@ class TaskStateMachine {
Rule.from(PENDING)
.to(ASSIGNED, KILLING)
.withCallback(
- new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
case KILLING:
- addWork(WorkCommand.DELETE);
+ addFollowup(DELETE);
break;
default:
@@ -291,16 +220,15 @@ class TaskStateMachine {
.to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
KILLING, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
break;
case PREEMPTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case FAILED:
@@ -308,25 +236,24 @@ class TaskStateMachine {
break;
case RESTARTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case KILLED:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case LOST:
- addWork(WorkCommand.RESCHEDULE);
- // fall through
- case KILLING:
- addWork(WorkCommand.KILL);
+ addFollowup(RESCHEDULE);
+ addFollowup(KILL);
break;
- case UNKNOWN:
+ case KILLING:
+ addFollowup(KILL);
break;
- default:
- // No-op.
+ default:
+ // No-op.
}
}
}
@@ -335,20 +262,19 @@ class TaskStateMachine {
Rule.from(STARTING)
.to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
break;
case RESTARTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case PREEMPTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case FAILED:
@@ -356,25 +282,25 @@ class TaskStateMachine {
break;
case KILLED:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case KILLING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case LOST:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case UNKNOWN:
// The slave previously acknowledged that it had the task, and now
// stopped reporting it.
- updateState(ScheduleStatus.LOST);
+ addFollowupTransition(LOST);
break;
- default:
- // No-op.
+ default:
+ // No-op.
}
}
}
@@ -383,20 +309,19 @@ class TaskStateMachine {
Rule.from(RUNNING)
.to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
.withCallback(
- new Closure<Transition<State>>() {
- @SuppressWarnings("fallthrough")
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> transition) {
+ switch (transition.getTo()) {
case FINISHED:
rescheduleIfService.execute();
break;
case PREEMPTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case RESTARTING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case FAILED:
@@ -404,19 +329,19 @@ class TaskStateMachine {
break;
case KILLED:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case KILLING:
- addWork(WorkCommand.KILL);
+ addFollowup(KILL);
break;
case LOST:
- addWork(WorkCommand.RESCHEDULE);
+ addFollowup(RESCHEDULE);
break;
case UNKNOWN:
- updateState(ScheduleStatus.LOST);
+ addFollowupTransition(LOST);
break;
default:
@@ -460,23 +385,25 @@ class TaskStateMachine {
// Since we want this action to be performed last in the transition sequence, the callback
// must be the last chained transition callback.
.onAnyTransition(
- new Closure<Transition<State>>() {
- @Override public void execute(final Transition<State> transition) {
- ScheduleStatus from = transition.getFrom().getState();
- ScheduleStatus to = transition.getTo().getState();
-
- if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
- // Prevent an update when killing a pending task, since the task is deleted
- // prior to the update.
- && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
- addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
- } else if (!transition.isAllowed()) {
- LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
- ILLEGAL_TRANSITIONS.incrementAndGet();
- }
-
+ new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(final Transition<ScheduleStatus> transition) {
if (transition.isValidStateChange()) {
+ ScheduleStatus from = transition.getFrom();
+ ScheduleStatus to = transition.getTo();
+
+ // TODO(wfarner): Clean up this hack. This is here to suppress unnecessary work
+ // (save followed by delete), but it shows a wart with this catch-all behavior.
+ // Strongly consider pushing the SAVE_STATE behavior to each transition handler.
+ boolean pendingDeleteHack = !((from == PENDING) && (to == KILLING));
+
+ // Don't bother saving state of a task that is being removed.
+ if ((to != UNKNOWN) && pendingDeleteHack) {
+ addFollowup(SAVE_STATE);
+ }
previousState = from;
+ } else {
+ LOG.severe("Illegal state transition attempted: " + transition);
+ ILLEGAL_TRANSITIONS.incrementAndGet();
}
}
}
@@ -490,56 +417,67 @@ class TaskStateMachine {
.build();
}
- private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
- return new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> item) {
- addWork(work);
- }
- };
+ private void addFollowup(Action action) {
+ addFollowup(new SideEffect(action, Optional.<ScheduleStatus>absent()));
}
- private void addWork(WorkCommand work) {
- addWork(work, Functions.<IScheduledTask>identity());
+ private void addFollowupTransition(ScheduleStatus status) {
+ addFollowup(new SideEffect(STATE_CHANGE, Optional.of(status)));
}
- private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
- LOG.info("Adding work command " + work + " for " + this);
- workSink.addWork(work, TaskStateMachine.this, mutation);
+ private void addFollowup(SideEffect action) {
+ LOG.info("Adding work command " + action + " for " + this);
+ sideEffects.add(action);
}
- /**
- * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
- *
- * @param status Status to apply to the task.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(ScheduleStatus status) {
- return updateState(status, Functions.<IScheduledTask>identity());
+ private Closure<Transition<ScheduleStatus>> addWorkClosure(final Action action) {
+ return new Closure<Transition<ScheduleStatus>>() {
+ @Override public void execute(Transition<ScheduleStatus> item) {
+ addFollowup(action);
+ }
+ };
}
- /**
- * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
- *
- * @param status Status to apply to the task.
- * @param auditMessage The (optional) audit message to associate with the transition.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
- return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
- }
+ public static class TransitionResult {
+ private final boolean success;
+ private final Set<SideEffect> sideEffects;
- /**
- * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
- *
- * @param status Status to apply to the task.
- * @param mutation Mutate operation to perform while updating the task.
- * @return {@code true} if the state change was allowed, {@code false} otherwise.
- */
- public synchronized boolean updateState(
- ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation) {
+ public TransitionResult(boolean success, Set<SideEffect> sideEffects) {
+ this.success = success;
+ this.sideEffects = Preconditions.checkNotNull(sideEffects);
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public Set<SideEffect> getSideEffects() {
+ return sideEffects;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TransitionResult)) {
+ return false;
+ }
+
+ TransitionResult other = (TransitionResult) o;
+ return (success == other.success)
+ && Objects.equal(sideEffects, other.sideEffects);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(success, sideEffects);
+ }
- return updateState(status, mutation, Optional.<String>absent());
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("success", success)
+ .add("sideEffects", sideEffects)
+ .toString();
+ }
}
/**
@@ -548,59 +486,35 @@ class TaskStateMachine {
* will be appended to the work queue.
*
* @param status Status to apply to the task.
- * @param auditMessage The audit message to associate with the transition.
- * @param mutation Mutate operation to perform while updating the task.
* @return {@code true} if the state change was allowed, {@code false} otherwise.
*/
- public synchronized boolean updateState(
- final ScheduleStatus status,
- Function<IScheduledTask, IScheduledTask> mutation,
- final Optional<String> auditMessage) {
-
+ public synchronized TransitionResult updateState(final ScheduleStatus status) {
checkNotNull(status);
- checkNotNull(mutation);
- checkNotNull(auditMessage);
+ Preconditions.checkState(sideEffects.isEmpty());
/**
* Don't bother applying noop state changes. If we end up modifying task state without a
* state transition (e.g. storing resource consumption of a running task), we need to find
* a different way to suppress noop transitions.
*/
- if (stateMachine.getState().getState() != status) {
- Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
- new Function<IScheduledTask, IScheduledTask>() {
- @Override public IScheduledTask apply(IScheduledTask task) {
- ScheduledTask builder = task.newBuilder();
- builder.addToTaskEvents(new TaskEvent()
- .setTimestamp(clock.nowMillis())
- .setStatus(status)
- .setMessage(auditMessage.orNull())
- .setScheduler(LOCAL_HOST_SUPPLIER.get()));
- return IScheduledTask.build(builder);
- }
- });
- return stateMachine.transition(State.create(status, operation));
+ if (stateMachine.getState() == status) {
+ return new TransitionResult(false, ImmutableSet.<SideEffect>of());
}
- return false;
+ boolean success = stateMachine.transition(status);
+ Set<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects);
+ sideEffects.clear();
+ return new TransitionResult(success, transitionEffects);
}
/**
* Fetch the current state from the state machine.
+ * TODO(wfarner): Consider removing, the caller should know this.
*
* @return The current state.
*/
public synchronized ScheduleStatus getState() {
- return stateMachine.getState().getState();
- }
-
- /**
- * Gets the ID for the task that this state machine manages.
- *
- * @return The state machine's task ID.
- */
- public String getTaskId() {
- return taskId;
+ return stateMachine.getState();
}
/**
@@ -616,6 +530,6 @@ class TaskStateMachine {
@Override
public String toString() {
- return getTaskId();
+ return stateMachine.getName();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java b/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
deleted file mode 100644
index aff74d5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-/**
- * Descriptions of the different types of external work commands that task state machines may
- * trigger.
- */
-enum WorkCommand {
- // Send an instruction for the runner of this task to kill the task.
- KILL,
- // Create a new state machine with a copy of this task.
- RESCHEDULE,
- // Update the task's state (schedule status) in the persistent store to match the state machine.
- UPDATE_STATE,
- // Delete this task from the persistent store.
- DELETE,
- // Increment the failure count for this task.
- INCREMENT_FAILURES
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 79f5605..26468ce 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -200,6 +200,10 @@ public interface Storage {
/**
* Executes the unit of mutating {@code work}.
+ * TODO(wfarner): Add a mechanism by which mutating work can add side-effect operations to be
+ * performed after completion of the outer-most transaction. As it stands, it's somewhat
+ * futile to try to achieve this within a transaction, since the local code does not know
+ * if the current transaction is nested.
*
* @param work The unit of work to execute.
* @param <T> The type of result this unit of work produces.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 7fe297a..3d0ff2d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -71,6 +71,8 @@ public interface TaskStore {
/**
* Offers temporary mutable access to tasks. If a task ID is not found, it will be silently
* skipped, and no corresponding task will be returned.
+ * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case,
+ * and it prevents the caller from worrying about a bad query having broad impact.
*
* @param query Query to match tasks against.
* @param mutator The mutate operation.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index b93e47f..af20e82 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -40,6 +41,7 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskRescheduled;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.storage.Storage;
@@ -50,14 +52,14 @@ import org.apache.mesos.Protos.SlaveID;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IArgumentMatcher;
-import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
@@ -65,6 +67,7 @@ import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class StateManagerImplTest extends EasyMockTest {
@@ -90,11 +93,6 @@ public class StateManagerImplTest extends EasyMockTest {
stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
}
- @After
- public void validateCompletion() {
- assertTrue(stateManager.getStorage().getEvents().isEmpty());
- }
-
private static class StateChangeMatcher implements IArgumentMatcher {
private final String taskId;
private final ScheduleStatus from;
@@ -171,7 +169,7 @@ public class StateManagerImplTest extends EasyMockTest {
.setStatus(PENDING)
.setTaskEvents(ImmutableList.of(new TaskEvent()
.setTimestamp(clock.nowMillis())
- .setScheduler(TaskStateMachine.LOCAL_HOST_SUPPLIER.get())
+ .setScheduler(StateManagerImpl.LOCAL_HOST_SUPPLIER.get())
.setStatus(PENDING)))
.setAssignedTask(new AssignedTask()
.setInstanceId(3)
@@ -265,6 +263,87 @@ public class StateManagerImplTest extends EasyMockTest {
changeState(unknownTask, RUNNING);
}
+ @Test
+ public void testIncrementFailureCount() {
+ ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+ String taskId = "a";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+ expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+
+ String taskId2 = "a2";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+ expectStateTransitions(taskId2, INIT, PENDING);
+ eventSink.post(new TaskRescheduled(task.getOwner().getRole(), task.getJobName(), 0));
+
+ control.replay();
+
+ insertTask(task, 0);
+
+ assignTask(taskId, HOST_A);
+ changeState(taskId, RUNNING);
+ changeState(taskId, FAILED);
+ IScheduledTask rescheduledTask = Iterables.getOnlyElement(
+ Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId2)));
+ assertEquals(1, rescheduledTask.getFailureCount());
+ }
+
+ @Test
+ public void testDoubleTransition() {
+ // Tests that a transition inducing another transition (STATE_CHANGE action) is performed.
+
+ ITaskConfig task = makeTask(JIM, MY_JOB);
+ String taskId = "a";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+ expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, LOST);
+
+ String taskId2 = "a2";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+ expectStateTransitions(taskId2, INIT, PENDING);
+ eventSink.post(new TaskRescheduled(task.getOwner().getRole(), task.getJobName(), 0));
+
+ control.replay();
+
+ insertTask(task, 0);
+
+ assignTask(taskId, HOST_A);
+ changeState(taskId, RUNNING);
+ changeState(taskId, UNKNOWN);
+ }
+
+ @Test
+ public void testCasTaskPresent() {
+ ITaskConfig task = makeTask(JIM, MY_JOB);
+ String taskId = "a";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+ expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, FAILED);
+
+ control.replay();
+
+ insertTask(task, 0);
+ assignTask(taskId, HOST_A);
+ assertFalse(stateManager.changeState(
+ taskId,
+ Optional.of(PENDING),
+ RUNNING,
+ Optional.<String>absent()));
+ assertTrue(stateManager.changeState(
+ taskId,
+ Optional.of(ASSIGNED),
+ FAILED,
+ Optional.<String>absent()));
+ }
+
+ @Test
+ public void testCasTaskNotFound() {
+ control.replay();
+
+ assertFalse(stateManager.changeState(
+ "a",
+ Optional.of(PENDING),
+ ASSIGNED,
+ Optional.<String>absent()));
+ }
+
private void expectStateTransitions(
String taskId,
ScheduleStatus initial,