You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/14 22:52:26 UTC
git commit: Begin cleanup of StateManager interface by removing state
change via Query.
Updated Branches:
refs/heads/master 730cff698 -> a6ab7fdb7
Begin cleanup of StateManager interface by removing state change via Query.
State changes via a broad query is a rarely-used feature, and probably usually
not a good idea. The query-based API was useful because it sometimes saves
code at the call site, but the big downside is that it makes it more difficult
for StateManager to perform course-correcting operations.
The test case added to StateManagerImplTest illustrates a now-fixed regression
that could contribute to state drift between the scheduler and the outside
world.
Bugs closed: AURORA-27
Reviewed at https://reviews.apache.org/r/16767/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/a6ab7fdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/a6ab7fdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/a6ab7fdb
Branch: refs/heads/master
Commit: a6ab7fdb7b6b4dfe724f47a58633526a87f34e8f
Parents: 730cff6
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 14 13:52:17 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 14 13:52:17 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/UserTaskLauncher.java | 4 +-
.../aurora/scheduler/async/TaskScheduler.java | 8 +--
.../aurora/scheduler/async/TaskThrottler.java | 4 +-
.../aurora/scheduler/async/TaskTimeout.java | 9 ++-
.../scheduler/state/MaintenanceController.java | 21 +++---
.../aurora/scheduler/state/SchedulerCore.java | 4 +-
.../scheduler/state/SchedulerCoreImpl.java | 60 +++++++++++-----
.../aurora/scheduler/state/StateManager.java | 32 ++++++---
.../scheduler/state/StateManagerImpl.java | 76 ++++++--------------
.../thrift/SchedulerThriftInterface.java | 3 +-
.../aurora/scheduler/UserTaskLauncherTest.java | 19 +++--
.../scheduler/async/TaskSchedulerTest.java | 5 +-
.../scheduler/async/TaskThrottlerTest.java | 6 +-
.../aurora/scheduler/async/TaskTimeoutTest.java | 26 +++++--
.../state/BaseSchedulerCoreImplTest.java | 4 +-
.../state/MaintenanceControllerImplTest.java | 7 +-
.../scheduler/state/StateManagerImplTest.java | 31 +++++---
.../thrift/SchedulerThriftInterfaceTest.java | 2 +-
18 files changed, 181 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 7e180ac..010776e 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,7 +27,6 @@ import com.google.common.base.Optional;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.OfferQueue;
import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.mesos.Protos.Offer;
@@ -85,7 +84,8 @@ class UserTaskLauncher implements TaskLauncher {
}
stateManager.changeState(
- Query.taskScoped(status.getTaskId().getValue()),
+ status.getTaskId().getValue(),
+ Optional.<ScheduleStatus>absent(),
translatedState,
Optional.fromNullable(message));
} catch (SchedulerException e) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 96c76ba..4afc332 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -161,9 +161,9 @@ interface TaskScheduler extends EventSubscriber {
return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
@Override public TaskSchedulerResult apply(MutableStoreProvider store) {
LOG.fine("Attempting to schedule task " + taskId);
- Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
- final IScheduledTask task =
- Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery), null);
+ final IScheduledTask task = Iterables.getOnlyElement(
+ store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+ null);
if (task == null) {
LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
} else {
@@ -182,7 +182,7 @@ interface TaskScheduler extends EventSubscriber {
// It is in the LOST state and a new task will move to PENDING to replace it.
// Should the state change fail due to storage issues, that's okay. The task will
// time out in the ASSIGNED state and be moved to LOST.
- stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
+ stateManager.changeState(taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
index 11a01ea..fd6c188 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
@@ -25,7 +25,6 @@ import com.google.common.eventbus.Subscribe;
import com.twitter.common.stats.SlidingStats;
import com.twitter.common.util.Clock;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -75,7 +74,8 @@ class TaskThrottler implements EventSubscriber {
new Runnable() {
@Override public void run() {
stateManager.changeState(
- Query.taskScoped(stateChange.getTaskId()).byStatus(THROTTLED),
+ stateChange.getTaskId(),
+ Optional.of(THROTTLED),
PENDING,
Optional.<String>absent());
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 9f14a99..64a1941 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -42,7 +42,6 @@ import com.twitter.common.stats.StatsProvider;
import com.twitter.common.util.Clock;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
@@ -186,9 +185,13 @@ class TaskTimeout implements EventSubscriber {
// timeout is still valid. Ideally, the future would have already been canceled, but in the
// event of a state transition race, including transientState prevents an unintended
// task timeout.
- Query.Builder query = Query.taskScoped(key.taskId).byStatus(key.status);
// Note: This requires LOST transitions trigger Driver.killTask.
- if (stateManager.changeState(query, ScheduleStatus.LOST, TIMEOUT_MESSAGE) > 0) {
+ if (stateManager.changeState(
+ key.taskId,
+ Optional.of(key.status),
+ ScheduleStatus.LOST,
+ TIMEOUT_MESSAGE)) {
+
timedOutTasks.incrementAndGet();
} else {
LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 3dd2271..af10d44 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -27,7 +27,6 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
-import com.twitter.common.base.Closure;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.HostStatus;
@@ -119,11 +118,7 @@ public interface MaintenanceController {
this.eventSink = checkNotNull(eventSink);
}
- private Set<HostStatus> watchDrainingTasks(
- MutableStoreProvider store,
- Set<String> hosts,
- Closure<Query.Builder> callback) {
-
+ private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
Set<String> emptyHosts = Sets.newHashSet();
for (String host : hosts) {
// If there are no tasks on the host, immediately transition to DRAINED.
@@ -134,7 +129,13 @@ public interface MaintenanceController {
if (activeTasks.isEmpty()) {
emptyHosts.add(host);
} else {
- callback.execute(query);
+ for (String taskId : activeTasks) {
+ stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.RESTARTING,
+ DRAINING_MESSAGE);
+ }
}
}
@@ -186,11 +187,7 @@ public interface MaintenanceController {
public Set<HostStatus> drain(final Set<String> hosts) {
return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
@Override public Set<HostStatus> apply(MutableStoreProvider store) {
- return watchDrainingTasks(store, hosts, new Closure<Query.Builder>() {
- @Override public void execute(Query.Builder query) {
- stateManager.changeState(query, ScheduleStatus.RESTARTING, DRAINING_MESSAGE);
- }
- });
+ return watchDrainingTasks(store, hosts);
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index b89d990..7a88629 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -83,11 +83,11 @@ public interface SchedulerCore {
/**
* Assigns a new state to tasks.
*
- * @param query Builder for a query to identify tasks
+ * @param taskId ID of the task to transition.
* @param status The new state of the tasks.
* @param message Additional information about the state transition.
*/
- void setTaskStatus(Query.Builder query, ScheduleStatus status, Optional<String> message);
+ void setTaskStatus(String taskId, ScheduleStatus status, Optional<String> message);
/**
* Kills a specific set of tasks.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index 1d450f2..8dec283 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -75,9 +75,8 @@ class SchedulerCoreImpl implements SchedulerCore {
// Schedulers that are responsible for triggering execution of jobs.
private final ImmutableList<JobManager> jobManagers;
- // TODO(Bill Farner): Avoid using StateManagerImpl.
// State manager handles persistence of task modifications and state transitions.
- private final StateManagerImpl stateManager;
+ private final StateManager stateManager;
private final TaskIdGenerator taskIdGenerator;
private final JobFilter jobFilter;
@@ -97,7 +96,7 @@ class SchedulerCoreImpl implements SchedulerCore {
Storage storage,
CronJobManager cronScheduler,
ImmediateJobManager immediateScheduler,
- StateManagerImpl stateManager,
+ StateManager stateManager,
TaskIdGenerator taskIdGenerator,
JobFilter jobFilter) {
@@ -118,7 +117,9 @@ class SchedulerCoreImpl implements SchedulerCore {
@Override
public synchronized void tasksDeleted(Set<String> taskIds) {
- setTaskStatus(Query.taskScoped(taskIds), ScheduleStatus.UNKNOWN, Optional.<String>absent());
+ for (String taskId : taskIds) {
+ setTaskStatus(taskId, ScheduleStatus.UNKNOWN, Optional.<String>absent());
+ }
}
@Override
@@ -245,18 +246,20 @@ class SchedulerCoreImpl implements SchedulerCore {
@Override
public synchronized void setTaskStatus(
- Query.Builder query,
+ String taskId,
final ScheduleStatus status,
Optional<String> message) {
- checkNotNull(query);
+ checkNotNull(taskId);
checkNotNull(status);
- stateManager.changeState(query, status, message);
+ stateManager.changeState(taskId, Optional.<ScheduleStatus>absent(), status, message);
}
@Override
- public synchronized void killTasks(Query.Builder query, String user) throws ScheduleException {
+ public synchronized void killTasks(Query.Builder query, final String user)
+ throws ScheduleException {
+
checkNotNull(query);
LOG.info("Killing tasks matching " + query);
@@ -274,10 +277,28 @@ class SchedulerCoreImpl implements SchedulerCore {
}
// Unless statuses were specifically supplied, only attempt to kill active tasks.
- Query.Builder taskQuery = query.get().isSetStatuses() ? query.byStatus(ACTIVE_STATES) : query;
+ final Query.Builder taskQuery = query.get().isSetStatuses()
+ ? query.byStatus(ACTIVE_STATES)
+ : query;
+
+ int tasksAffected = storage.write(new MutateWork.Quiet<Integer>() {
+ @Override public Integer apply(MutableStoreProvider storeProvider) {
+ int total = 0;
+ for (String taskId : Tasks.ids(storeProvider.getTaskStore().fetchTasks(taskQuery))) {
+ boolean changed = stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ KILLING,
+ Optional.of("Killed by " + user));
+
+ if (changed) {
+ total++;
+ }
+ }
+ return total;
+ }
+ });
- int tasksAffected =
- stateManager.changeState(taskQuery, KILLING, Optional.of("Killed by " + user));
if (!jobDeleted && (tasksAffected == 0)) {
throw new ScheduleException("No jobs to kill");
}
@@ -307,22 +328,27 @@ class SchedulerCoreImpl implements SchedulerCore {
throw new ScheduleException("Not all requested shards are active.");
}
LOG.info("Restarting shards matching " + query);
- stateManager.changeState(
- Query.taskScoped(Tasks.ids(matchingTasks)),
- RESTARTING,
- Optional.of("Restarted by " + requestingUser));
+ for (String taskId : Tasks.ids(matchingTasks)) {
+ stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ RESTARTING,
+ Optional.of("Restarted by " + requestingUser));
+ }
}
});
}
-
@Override
public synchronized void preemptTask(IAssignedTask task, IAssignedTask preemptingTask) {
checkNotNull(task);
checkNotNull(preemptingTask);
// TODO(William Farner): Throw SchedulingException if either task doesn't exist, etc.
- stateManager.changeState(Query.taskScoped(task.getTaskId()), ScheduleStatus.PREEMPTING,
+ stateManager.changeState(
+ task.getTaskId(),
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.PREEMPTING,
Optional.of("Preempting in favor of " + preemptingTask.getTaskId()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 045165d..4249707 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -21,27 +21,39 @@ import java.util.Set;
import com.google.common.base.Optional;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos.SlaveID;
/**
- * Thin interface for the state manager.
+ * A manager for the state of tasks. Most modifications to tasks should be made here, especially
+ * those that alter the {@link ScheduleStatus} of tasks.
*/
public interface StateManager {
/**
- * Performs a simple state change, transitioning all tasks matching a query to the given
- * state and applying the given audit message.
- * TODO(William Farner): Consider removing the return value.
+ * Attempts to alter a task from its existing state to {@code newState}. If a {@code casState}
+ * (compare and swap) is provided, the transition will only performed if the task is currently
+ * in the state.
+ *
+ * @param taskId ID of the task to transition.
+ * @param casState State that the task must be in for the operation to proceed. If the task
+ * is found to not be in {@code casState}, no action is performed and
+ * {@code false} is returned. This can be useful when deferring asynchronous
+ * work, to perform a follow-up action iff the task has not changed since the
+ * decision to defer the action was mde.
+ * @param newState State to move the task to.
+ * @param auditMessage Message to include with the transition.
+ * @return {@code true} if the transition was performed and the task was moved to
+ * {@code newState}, {@code false} if the transition was not allowed, or the task was not
+ * in {@code casState}.
*
- * @param query Builder of the query to perform, the results of which will be modified.
- * @param newState State to move the resulting tasks into.
- * @param auditMessage Audit message to apply along with the state change.
- * @return the number of successful state changes.
*/
- int changeState(Query.Builder query, ScheduleStatus newState, Optional<String> auditMessage);
+ boolean changeState(
+ String taskId,
+ Optional<ScheduleStatus> casState,
+ ScheduleStatus newState,
+ Optional<String> auditMessage);
/**
* Assigns a task to a specific slave.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 256f830..2b8ca09 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -31,8 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -54,8 +52,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -78,8 +74,9 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
* Manager of all persistence-related operations for the scheduler. Acts as a controller for
* persisted state machine transitions, and their side-effects.
*
- * TODO(William Farner): Re-evaluate thread safety here, specifically risk of races that
- * modify managerState.
+ * TODO(wfarner): This class is due for an overhaul. There are several aspects of it that could
+ * probably be made much simpler. Specifically, the workQueue is particularly difficult to reason
+ * about.
*/
public class StateManagerImpl implements StateManager {
private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
@@ -199,13 +196,15 @@ public class StateManagerImpl implements StateManager {
}
@Override
- public int changeState(
- Query.Builder query,
+ public boolean changeState(
+ String taskId,
+ Optional<ScheduleStatus> casState,
final ScheduleStatus newState,
final Optional<String> auditMessage) {
- return changeState(query, new Function<TaskStateMachine, Boolean>() {
- @Override public Boolean apply(TaskStateMachine stateMachine) {
+ return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
+ @Override
+ public Boolean apply(TaskStateMachine stateMachine) {
return stateMachine.updateState(newState, auditMessage);
}
});
@@ -223,34 +222,26 @@ public class StateManagerImpl implements StateManager {
checkNotNull(assignedPorts);
TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
- changeState(Query.taskScoped(taskId), mutation);
+ changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
return mutation.getAssignedTask();
}
- private int changeStateInWriteOperation(
- Set<String> taskIds,
- Function<TaskStateMachine, Boolean> stateChange) {
-
- int count = 0;
- for (TaskStateMachine stateMachine : getStateMachines(taskIds).values()) {
- if (stateChange.apply(stateMachine)) {
- ++count;
- }
- }
- return count;
- }
-
- private int changeState(
- final Query.Builder query,
+ private boolean changeState(
+ final String taskId,
+ final Optional<ScheduleStatus> casState,
final Function<TaskStateMachine, Boolean> stateChange) {
- return storage.write(storage.new QuietSideEffectWork<Integer>() {
- @Override public Integer apply(MutableStoreProvider storeProvider) {
- Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
- return changeStateInWriteOperation(ids, stateChange);
+ return storage.write(storage.new QuietSideEffectWork<Boolean>() {
+ @Override public Boolean apply(MutableStoreProvider storeProvider) {
+ IScheduledTask task = Iterables.getOnlyElement(
+ storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+ null);
+ if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
+ return false;
+ }
+
+ return stateChange.apply(getStateMachine(taskId, task));
}
});
}
@@ -416,27 +407,6 @@ public class StateManagerImpl implements StateManager {
});
}
- private Map<String, TaskStateMachine> getStateMachines(final Set<String> taskIds) {
- return storage.consistentRead(new Work.Quiet<Map<String, TaskStateMachine>>() {
- @Override public Map<String, TaskStateMachine> apply(StoreProvider storeProvider) {
- Map<String, IScheduledTask> existingTasks = Maps.uniqueIndex(
- storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
- new Function<IScheduledTask, String>() {
- @Override public String apply(IScheduledTask input) {
- return input.getAssignedTask().getTaskId();
- }
- });
-
- ImmutableMap.Builder<String, TaskStateMachine> builder = ImmutableMap.builder();
- for (String taskId : taskIds) {
- // Pass null get() values through.
- builder.put(taskId, getStateMachine(taskId, existingTasks.get(taskId)));
- }
- return builder.build();
- }
- });
- }
-
private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
if (task != null) {
return createStateMachine(task, task.getStatus());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index c1a11bd..76caa62 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -672,8 +672,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
return response;
}
- schedulerCore.setTaskStatus(
- Query.taskScoped(taskId), status, transitionMessage(context.getIdentity()));
+ schedulerCore.setTaskStatus(taskId, status, transitionMessage(context.getIdentity()));
return new Response().setResponseCode(OK).setMessage("Transition attempted.");
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index bba1b72..ffc37db 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Iterables;
import com.twitter.common.collections.Pair;
import com.twitter.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.OfferQueue;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
@@ -88,9 +88,12 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test
public void testForwardsStatusUpdates() throws Exception {
- expect(
- stateManager.changeState(Query.taskScoped(TASK_ID_A), RUNNING, Optional.of("fake message")))
- .andReturn(1);
+ expect(stateManager.changeState(
+ TASK_ID_A,
+ Optional.<ScheduleStatus>absent(),
+ RUNNING,
+ Optional.of("fake message")))
+ .andReturn(true);
control.replay();
@@ -114,7 +117,8 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test(expected = StorageException.class)
public void testFailedStatusUpdate() throws Exception {
expect(stateManager.changeState(
- Query.taskScoped(TASK_ID_A),
+ TASK_ID_A,
+ Optional.<ScheduleStatus>absent(),
RUNNING,
Optional.of("fake message")))
.andThrow(new StorageException("Injected error"));
@@ -132,10 +136,11 @@ public class UserTaskLauncherTest extends EasyMockTest {
@Test
public void testMemoryLimitTranslationHack() throws Exception {
expect(stateManager.changeState(
- Query.taskScoped(TASK_ID_A),
+ TASK_ID_A,
+ Optional.<ScheduleStatus>absent(),
FAILED,
Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY)))
- .andReturn(0);
+ .andReturn(false);
control.replay();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 4dfac03..6c124c5 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -317,10 +317,11 @@ public class TaskSchedulerTest extends EasyMockTest {
driver.launchTask(OFFER_A.getId(), mesosTask);
expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
expect(stateManager.changeState(
- Query.taskScoped("a").byStatus(PENDING),
+ "a",
+ Optional.of(PENDING),
LOST,
TaskSchedulerImpl.LAUNCH_FAILED_MSG))
- .andReturn(1);
+ .andReturn(true);
replayAndCreateScheduler();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
index 66bc2a0..a539b59 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
@@ -29,7 +29,6 @@ import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
@@ -119,10 +118,11 @@ public class TaskThrottlerTest extends EasyMockTest {
private void expectMovedToPending(IScheduledTask task) {
expect(stateManager.changeState(
- Query.taskScoped(Tasks.id(task)).byStatus(THROTTLED),
+ Tasks.id(task),
+ Optional.of(THROTTLED),
PENDING,
Optional.<String>absent()))
- .andReturn(1);
+ .andReturn(true);
}
private IScheduledTask makeTask(String id, ScheduleStatus status) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 023905b..d4685f6 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
@@ -35,7 +36,6 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -175,8 +175,12 @@ public class TaskTimeoutTest extends EasyMockTest {
expectTaskWatch();
expectCancel();
Capture<Runnable> killingTimeout = expectTaskWatch();
- Query.Builder query = Query.taskScoped(TASK_ID).byStatus(KILLING);
- expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(1);
+ expect(stateManager.changeState(
+ TASK_ID,
+ Optional.of(KILLING),
+ LOST,
+ TaskTimeout.TIMEOUT_MESSAGE))
+ .andReturn(true);
replayAndCreate();
@@ -188,8 +192,12 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testTimeout() throws Exception {
Capture<Runnable> assignedTimeout = expectTaskWatch();
- Query.Builder query = Query.taskScoped(TASK_ID).byStatus(ASSIGNED);
- expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(1);
+ expect(stateManager.changeState(
+ TASK_ID,
+ Optional.of(ASSIGNED),
+ LOST,
+ TaskTimeout.TIMEOUT_MESSAGE))
+ .andReturn(true);
replayAndCreate();
@@ -202,8 +210,12 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testTaskDeleted() throws Exception {
Capture<Runnable> assignedTimeout = expectTaskWatch();
- Query.Builder query = Query.taskScoped(TASK_ID).byStatus(KILLING);
- expect(stateManager.changeState(query, LOST, TaskTimeout.TIMEOUT_MESSAGE)).andReturn(0);
+ expect(stateManager.changeState(
+ TASK_ID,
+ Optional.of(KILLING),
+ LOST,
+ TaskTimeout.TIMEOUT_MESSAGE))
+ .andReturn(false);
replayAndCreate();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index cc929f8..4eeed38 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -1453,7 +1453,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
ScheduleStatus status,
Optional<String> message) {
- scheduler.setTaskStatus(query, status, message);
+ for (String taskId : Tasks.ids(Storage.Util.consistentFetchTasks(storage, query))) {
+ scheduler.setTaskStatus(taskId, status, message);
+ }
}
public void changeStatus(Query.Builder query, ScheduleStatus status, ScheduleStatus... statuses) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index c0addd6..f7f7632 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -33,6 +33,7 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -55,7 +56,6 @@ import static org.junit.Assert.assertEquals;
public class MaintenanceControllerImplTest extends EasyMockTest {
private static final String HOST_A = "a";
- private static final String HOST_B = "b";
private static final Set<String> A = ImmutableSet.of(HOST_A);
private StorageTestUtil storageUtil;
@@ -102,10 +102,11 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
expectMaintenanceModeChange(HOST_A, SCHEDULED);
expectFetchTasksByHost(HOST_A, ImmutableSet.<ScheduledTask>of(task));
expect(stateManager.changeState(
- Query.slaveScoped(HOST_A).active(),
+ Tasks.id(task),
+ Optional.<ScheduleStatus>absent(),
ScheduleStatus.RESTARTING,
MaintenanceControllerImpl.DRAINING_MESSAGE))
- .andReturn(1);
+ .andReturn(true);
expectMaintenanceModeChange(HOST_A, DRAINING);
expect(storageUtil.attributeStore.getHostAttributes(HOST_A))
.andReturn(Optional.of(new HostAttributes().setHost(HOST_A).setMode(DRAINING)));
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 74ec74f..5379300 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -202,8 +202,8 @@ public class StateManagerImplTest extends EasyMockTest {
control.replay();
insertTask(task, 0);
- assertEquals(1, changeState(taskId, KILLING));
- assertEquals(0, changeState(taskId, KILLING));
+ assertEquals(true, changeState(taskId, KILLING));
+ assertEquals(false, changeState(taskId, KILLING));
}
@Test
@@ -228,17 +228,15 @@ public class StateManagerImplTest extends EasyMockTest {
@Test
public void testNestedEvents() {
- String id = "a";
+ final String id = "a";
ITaskConfig task = makeTask(JIM, MY_JOB);
expect(taskIdGenerator.generate(task, 0)).andReturn(id);
// Trigger an event that produces a side-effect and a PubSub event .
eventSink.post(matchStateChange(id, INIT, PENDING));
expectLastCall().andAnswer(new IAnswer<Void>() {
- @Override
- public Void answer() throws Throwable {
- stateManager.changeState(
- Query.unscoped(), ScheduleStatus.ASSIGNED, Optional.<String>absent());
+ @Override public Void answer() throws Throwable {
+ changeState(id, ASSIGNED);
return null;
}
});
@@ -285,6 +283,17 @@ public class StateManagerImplTest extends EasyMockTest {
changeState(taskId, FAILED);
}
+ @Test
+ public void testKillUnknownTask() {
+ String unknownTask = "unknown";
+
+ driver.killTask(unknownTask);
+
+ control.replay();
+
+ changeState(unknownTask, RUNNING);
+ }
+
private void expectStateTransitions(
String taskId,
ScheduleStatus initial,
@@ -311,8 +320,12 @@ public class StateManagerImplTest extends EasyMockTest {
stateManager.insertPendingTasks(ImmutableMap.of(instanceId, task));
}
- private int changeState(String taskId, ScheduleStatus status) {
- return stateManager.changeState(Query.taskScoped(taskId), status, Optional.<String>absent());
+ private boolean changeState(String taskId, ScheduleStatus status) {
+ return stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ status,
+ Optional.<String>absent());
}
private static ITaskConfig makeTask(Identity owner, String job) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a6ab7fdb/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 91c1c24..b46f29a 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -474,7 +474,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
String taskId = "task_id_foo";
ScheduleStatus status = ScheduleStatus.FAILED;
- scheduler.setTaskStatus(Query.taskScoped(taskId), status, transitionMessage(USER));
+ scheduler.setTaskStatus(taskId, status, transitionMessage(USER));
// Expect auth is first called by an interceptor and then by SchedulerThriftInterface to extract
// the SessionContext.
// Note: This will change after AOP-style session validation passes in a SessionContext.