You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2016/01/22 08:04:41 UTC
aurora git commit: Add storage API methods for fetching amd mutating
a task by ID.
Repository: aurora
Updated Branches:
refs/heads/master c89fecbcd -> 66a4d5fdd
Add storage API methods for fetching amd mutating a task by ID.
Reviewed at https://reviews.apache.org/r/42628/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/66a4d5fd
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/66a4d5fd
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/66a4d5fd
Branch: refs/heads/master
Commit: 66a4d5fdd3ff66facccd094e6c7523b0a2d19860
Parents: c89fecb
Author: Bill Farner <wf...@apache.org>
Authored: Thu Jan 21 23:04:35 2016 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Jan 21 23:04:35 2016 -0800
----------------------------------------------------------------------
.../aurora/scheduler/http/StructDump.java | 14 ++--
.../scheduling/RescheduleCalculator.java | 6 +-
.../scheduler/state/StateManagerImpl.java | 46 +++++---------
.../aurora/scheduler/storage/Storage.java | 8 ++-
.../aurora/scheduler/storage/TaskStore.java | 26 ++++++--
.../scheduler/storage/db/DbTaskStore.java | 57 +++++++++++++----
.../storage/log/WriteAheadStorage.java | 19 ++++--
.../scheduler/storage/mem/MemTaskStore.java | 67 ++++++++++++++------
.../RescheduleCalculatorImplTest.java | 11 ++--
.../scheduler/state/StateManagerImplTest.java | 11 ++--
.../storage/AbstractCronJobStoreTest.java | 5 +-
.../storage/AbstractTaskStoreTest.java | 35 +++++-----
.../scheduler/storage/log/LogStorageTest.java | 41 ++++++------
.../storage/testing/StorageTestUtil.java | 9 +++
14 files changed, 213 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
index 4fa5254..f84767a 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
@@ -23,15 +23,14 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
import org.apache.aurora.common.thrift.Util;
import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.thrift.TBase;
import static java.util.Objects.requireNonNull;
@@ -73,13 +72,10 @@ public class StructDump extends JerseyTemplateServlet {
public Response dumpJob(
@PathParam("task") final String taskId) {
- return dumpEntity("Task " + taskId, storeProvider -> {
- // Deep copy the struct to sidestep any subclass trickery inside the storage system.
- return Optional.fromNullable(Iterables.getOnlyElement(
- storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
- null)
- .newBuilder());
- });
+ return dumpEntity(
+ "Task " + taskId,
+ storeProvider ->
+ storeProvider.getTaskStore().fetchTask(taskId).transform(IScheduledTask::newBuilder));
}
/**
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
index c136d1a..4b0ef81 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
@@ -32,7 +32,6 @@ import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.util.BackoffStrategy;
import org.apache.aurora.common.util.Random;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -147,10 +146,7 @@ public interface RescheduleCalculator {
return Optional.absent();
}
- Iterable<IScheduledTask> res =
- Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId()));
-
- return Optional.fromNullable(Iterables.getOnlyElement(res, null));
+ return Storage.Util.fetchTask(storage, task.getAncestorId());
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 720b5e5..e5b2f41 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -31,7 +31,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@@ -47,6 +46,7 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
import org.apache.aurora.scheduler.state.SideEffect.Action;
@@ -163,18 +163,16 @@ public class StateManagerImpl implements StateManager {
public IAssignedTask assignTask(
MutableStoreProvider storeProvider,
String taskId,
- final String slaveHost,
- final SlaveID slaveId,
- final Map<String, Integer> assignedPorts) {
+ String slaveHost,
+ SlaveID slaveId,
+ Map<String, Integer> assignedPorts) {
checkNotBlank(taskId);
checkNotBlank(slaveHost);
requireNonNull(slaveId);
requireNonNull(assignedPorts);
- Query.Builder query = Query.taskScoped(taskId);
-
- storeProvider.getUnsafeTaskStore().mutateTasks(query,
+ IScheduledTask mutated = storeProvider.getUnsafeTaskStore().mutateTask(taskId,
task -> {
ScheduledTask builder = task.newBuilder();
builder.getAssignedTask()
@@ -182,7 +180,7 @@ public class StateManagerImpl implements StateManager {
.setSlaveHost(slaveHost)
.setSlaveId(slaveId.getValue());
return IScheduledTask.build(builder);
- });
+ }).get();
StateChangeResult changeResult = updateTaskAndExternalState(
storeProvider.getUnsafeTaskStore(),
@@ -195,10 +193,7 @@ public class StateManagerImpl implements StateManager {
changeResult == SUCCESS,
"Attempt to assign task " + taskId + " to " + slaveHost + " failed");
- return Iterables.getOnlyElement(
- Iterables.transform(
- storeProvider.getTaskStore().fetchTasks(query),
- IScheduledTask::getAssignedTask));
+ return mutated.getAssignedTask();
}
@VisibleForTesting
@@ -219,9 +214,7 @@ public class StateManagerImpl implements StateManager {
ScheduleStatus targetState,
Optional<String> transitionMessage) {
- Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
- taskStore.fetchTasks(Query.taskScoped(taskId)),
- null));
+ Optional<IScheduledTask> task = taskStore.fetchTask(taskId);
// CAS operation fails if the task does not exist, or the states don't match.
if (casState.isPresent()
@@ -264,34 +257,32 @@ public class StateManagerImpl implements StateManager {
private StateChangeResult updateTaskAndExternalState(
TaskStore.Mutable taskStore,
String taskId,
- // Note: This argument is deliberately non-final, and should not be made final.
+ // Note: This argument should be used with caution.
// This is because using the captured value within the storage operation below is
// highly-risky, since it doesn't necessarily represent the value in storage.
// As a result, it would be easy to accidentally clobber mutations.
Optional<IScheduledTask> task,
- final Optional<ScheduleStatus> targetState,
- final Optional<String> transitionMessage) {
+ Optional<ScheduleStatus> targetState,
+ Optional<String> transitionMessage) {
if (task.isPresent()) {
Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId()));
}
- final List<PubsubEvent> events = Lists.newArrayList();
+ List<PubsubEvent> events = Lists.newArrayList();
- final TaskStateMachine stateMachine = task.isPresent()
+ TaskStateMachine stateMachine = task.isPresent()
? new TaskStateMachine(task.get())
: new TaskStateMachine(taskId);
TransitionResult result = stateMachine.updateState(targetState);
- Query.Builder query = Query.taskScoped(taskId);
for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
- Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
- Iterables.getOnlyElement(taskStore.fetchTasks(query), null));
+ Optional<IScheduledTask> upToDateTask = taskStore.fetchTask(taskId);
switch (sideEffect.getAction()) {
case INCREMENT_FAILURES:
- taskStore.mutateTasks(query, task1 -> IScheduledTask.build(
+ taskStore.mutateTask(taskId, task1 -> IScheduledTask.build(
task1.newBuilder().setFailureCount(task1.getFailureCount() + 1)));
break;
@@ -300,7 +291,7 @@ public class StateManagerImpl implements StateManager {
upToDateTask.isPresent(),
"Operation expected task " + taskId + " to be present.");
- taskStore.mutateTasks(query, task1 -> {
+ Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, task1 -> {
ScheduledTask mutableTask = task1.newBuilder();
mutableTask.setStatus(targetState.get());
mutableTask.addToTaskEvents(new TaskEvent()
@@ -310,10 +301,7 @@ public class StateManagerImpl implements StateManager {
.setScheduler(LOCAL_HOST_SUPPLIER.get()));
return IScheduledTask.build(mutableTask);
});
- events.add(
- PubsubEvent.TaskStateChange.transition(
- Iterables.getOnlyElement(taskStore.fetchTasks(query)),
- stateMachine.getPreviousState()));
+ events.add(TaskStateChange.transition(mutated.get(), stateMachine.getPreviousState()));
break;
case RESCHEDULE:
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 6109158..578bb37 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -20,6 +20,8 @@ import java.lang.annotation.Target;
import javax.inject.Qualifier;
+import com.google.common.base.Optional;
+
import org.apache.aurora.scheduler.base.Query.Builder;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -291,10 +293,14 @@ public interface Storage {
* @param query Builder of the query to perform.
* @return Tasks returned from the query.
*/
- public static Iterable<IScheduledTask> fetchTasks(Storage storage, final Builder query) {
+ public static Iterable<IScheduledTask> fetchTasks(Storage storage, Builder query) {
return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query));
}
+ public static Optional<IScheduledTask> fetchTask(Storage storage, String taskId) {
+ return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTask(taskId));
+ }
+
public static Iterable<IJobConfiguration> fetchCronJobs(Storage storage) {
return storage.read(storeProvider -> storeProvider.getCronJobStore().fetchJobs());
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 62639c4..4e4f8d2 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage;
import java.util.Set;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
@@ -34,8 +35,16 @@ import static com.google.common.base.CharMatcher.WHITESPACE;
public interface TaskStore {
/**
+ * Fetches a task.
+ *
+ * @param taskId ID of the task to fetch.
+ * @return The task, if it exists.
+ */
+ Optional<IScheduledTask> fetchTask(String taskId);
+
+ /**
* Fetches a read-only view of tasks matching a query and filters. Intended for use with a
- * {@link org.apache.aurora.scheduler.base.Query.Builder}.
+ * {@link Query.Builder}.
*
* @param query Builder of the query to identify tasks with.
* @return A read-only view of matching tasks.
@@ -82,14 +91,23 @@ public interface TaskStore {
void deleteTasks(Set<String> taskIds);
/**
+ * Mutates a single task, if present.
+ *
+ * @param taskId Unique ID of the task to mutate.
+ * @param mutator The mutate operation.
+ * @return The result of the mutate operation, if performed.
+ */
+ Optional<IScheduledTask> mutateTask(
+ String taskId,
+ Function<IScheduledTask, IScheduledTask> mutator);
+
+ /**
* Offers temporary mutable access to tasks. If a task ID is not found, it will be silently
* skipped, and no corresponding task will be returned.
- * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case,
- * and it prevents the caller from worrying about a bad query having broad impact.
*
* @param query Query to match tasks against.
* @param mutator The mutate operation.
- * @return Immutable copies of only the tasks that were mutated.
+ * @return Immutable copies <em>of only the tasks that were mutated</em>.
*/
ImmutableSet<IScheduledTask> mutateTasks(
Query.Builder query,
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
index d406134..43fda1d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
@@ -79,6 +79,14 @@ class DbTaskStore implements TaskStore.Mutable {
this.slowQueryThresholdNanos = slowQueryThreshold.as(Time.NANOSECONDS);
}
+ @Timed("db_storage_fetch_task")
+ @Override
+ public Optional<IScheduledTask> fetchTask(String taskId) {
+ requireNonNull(taskId);
+ return Optional.fromNullable(taskMapper.selectById(taskId))
+ .transform(DbScheduledTask::toImmutable);
+ }
+
@Timed("db_storage_fetch_tasks")
@Override
public ImmutableSet<IScheduledTask> fetchTasks(Builder query) {
@@ -160,28 +168,50 @@ class DbTaskStore implements TaskStore.Mutable {
}
}
- @Timed("db_storage_mutate_tasks")
- @Override
- public ImmutableSet<IScheduledTask> mutateTasks(
- Builder query,
+ private Function<IScheduledTask, IScheduledTask> mutateAndSave(
Function<IScheduledTask, IScheduledTask> mutator) {
- requireNonNull(query);
- requireNonNull(mutator);
-
- ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
- for (IScheduledTask original : fetchTasks(query)) {
+ return original -> {
IScheduledTask maybeMutated = mutator.apply(original);
+ requireNonNull(maybeMutated);
if (!original.equals(maybeMutated)) {
Preconditions.checkState(
Tasks.id(original).equals(Tasks.id(maybeMutated)),
"A task's ID may not be mutated.");
saveTasks(ImmutableSet.of(maybeMutated));
- mutated.add(maybeMutated);
}
- }
+ return maybeMutated;
+ };
+ }
+
+ @Timed("db_storage_mutate_task")
+ @Override
+ public Optional<IScheduledTask> mutateTask(
+ String taskId,
+ Function<IScheduledTask, IScheduledTask> mutator) {
- return mutated.build();
+ requireNonNull(taskId);
+ requireNonNull(mutator);
+
+ return fetchTask(taskId).transform(mutateAndSave(mutator));
+ }
+
+ @Timed("db_storage_mutate_tasks")
+ @Override
+ public ImmutableSet<IScheduledTask> mutateTasks(
+ Builder query,
+ Function<IScheduledTask, IScheduledTask> mutator) {
+
+ requireNonNull(query);
+ requireNonNull(mutator);
+
+ Function<IScheduledTask, IScheduledTask> mutateFunction = mutateAndSave(mutator);
+ Iterable<Optional<IScheduledTask>> mutations = matches(query)
+ .transform(original -> {
+ IScheduledTask mutateResult = mutateFunction.apply(original);
+ return original.equals(mutateResult) ? Optional.absent() : Optional.of(mutateResult);
+ });
+ return ImmutableSet.copyOf(Optional.presentInstances(mutations));
}
@Timed("db_storage_unsafe_modify_in_place")
@@ -189,8 +219,7 @@ class DbTaskStore implements TaskStore.Mutable {
public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
checkNotNull(taskId);
checkNotNull(taskConfiguration);
- Optional<IScheduledTask> task =
- Optional.fromNullable(Iterables.getOnlyElement(fetchTasks(Query.taskScoped(taskId)), null));
+ Optional<IScheduledTask> task = fetchTask(taskId);
if (task.isPresent()) {
deleteTasks(ImmutableSet.of(taskId));
ScheduledTask builder = task.get().newBuilder();
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index c44ff47..7283531 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -193,12 +193,21 @@ class WriteAheadStorage extends WriteAheadStorageForwarder implements
}
@Override
- public ImmutableSet<IScheduledTask> mutateTasks(
- final Query.Builder query,
- final Function<IScheduledTask, IScheduledTask> mutator) {
+ public Optional<IScheduledTask> mutateTask(
+ String taskId,
+ Function<IScheduledTask, IScheduledTask> mutator) {
+
+ Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
+ log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
+ write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
- requireNonNull(query);
- requireNonNull(mutator);
+ return mutated;
+ }
+
+ @Override
+ public ImmutableSet<IScheduledTask> mutateTasks(
+ Query.Builder query,
+ Function<IScheduledTask, IScheduledTask> mutator) {
ImmutableSet<IScheduledTask> mutated = taskStore.mutateTasks(query, mutator);
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index c55dcc9..8fd024a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -124,13 +124,20 @@ class MemTaskStore implements TaskStore.Mutable {
taskQueriesAll = statsProvider.makeCounter("task_queries_all");
}
+ @Timed("mem_storage_fetch_task")
+ @Override
+ public Optional<IScheduledTask> fetchTask(String taskId) {
+ requireNonNull(taskId);
+ return Optional.fromNullable(tasks.get(taskId)).transform(t -> t.storedTask);
+ }
+
@Timed("mem_storage_fetch_tasks")
@Override
public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query) {
requireNonNull(query);
long start = System.nanoTime();
- ImmutableSet<IScheduledTask> result = matches(query).transform(TO_SCHEDULED).toSet();
+ ImmutableSet<IScheduledTask> result = matches(query).toSet();
long durationNanos = System.nanoTime() - start;
boolean infoLevel = durationNanos >= slowQueryThresholdNanos;
long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
@@ -196,32 +203,50 @@ class MemTaskStore implements TaskStore.Mutable {
}
}
- @Timed("mem_storage_mutate_tasks")
- @Override
- public ImmutableSet<IScheduledTask> mutateTasks(
- Query.Builder query,
+ private Function<IScheduledTask, IScheduledTask> mutateAndSave(
Function<IScheduledTask, IScheduledTask> mutator) {
- requireNonNull(query);
- requireNonNull(mutator);
-
- ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
- for (Task original : matches(query).toList()) {
- IScheduledTask maybeMutated = mutator.apply(original.storedTask);
- if (!original.storedTask.equals(maybeMutated)) {
+ return original -> {
+ IScheduledTask maybeMutated = mutator.apply(original);
+ requireNonNull(maybeMutated);
+ if (!original.equals(maybeMutated)) {
Preconditions.checkState(
- Tasks.id(original.storedTask).equals(Tasks.id(maybeMutated)),
+ Tasks.id(original).equals(Tasks.id(maybeMutated)),
"A task's ID may not be mutated.");
tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
for (SecondaryIndex<?> index : secondaryIndices) {
- index.replace(original.storedTask, maybeMutated);
+ index.replace(original, maybeMutated);
}
-
- mutated.add(maybeMutated);
}
- }
+ return maybeMutated;
+ };
+ }
- return mutated.build();
+ @Timed("mem_storage_mutate_task")
+ @Override
+ public Optional<IScheduledTask> mutateTask(
+ String taskId,
+ Function<IScheduledTask, IScheduledTask> mutator) {
+
+ return fetchTask(taskId).transform(mutateAndSave(mutator));
+ }
+
+ @Timed("mem_storage_mutate_tasks")
+ @Override
+ public ImmutableSet<IScheduledTask> mutateTasks(
+ Query.Builder query,
+ Function<IScheduledTask, IScheduledTask> mutator) {
+
+ requireNonNull(query);
+ requireNonNull(mutator);
+
+ Function<IScheduledTask, IScheduledTask> mutateFunction = mutateAndSave(mutator);
+ Iterable<Optional<IScheduledTask>> mutations = matches(query)
+ .transform(original -> {
+ IScheduledTask mutateResult = mutateFunction.apply(original);
+ return original.equals(mutateResult) ? Optional.absent() : Optional.of(mutateResult);
+ });
+ return ImmutableSet.copyOf(Optional.presentInstances(mutations));
}
@Timed("mem_storage_unsafe_modify_in_place")
@@ -259,7 +284,7 @@ class MemTaskStore implements TaskStore.Mutable {
.toList();
}
- private FluentIterable<Task> matches(Query.Builder query) {
+ private FluentIterable<IScheduledTask> matches(Query.Builder query) {
// Apply the query against the working set.
Optional<? extends Iterable<Task>> from = Optional.absent();
if (query.get().isSetTaskIds()) {
@@ -284,7 +309,9 @@ class MemTaskStore implements TaskStore.Mutable {
}
}
- return FluentIterable.from(from.get()).filter(queryFilter(query));
+ return FluentIterable.from(from.get())
+ .filter(queryFilter(query))
+ .transform(TO_SCHEDULED);
}
private static final Function<Task, IScheduledTask> TO_SCHEDULED = task -> task.storedTask;
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
index b380f21..9d21dcd 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
@@ -29,7 +29,6 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -78,7 +77,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest {
@Test
public void testNoPenaltyDeletedAncestor() {
String ancestorId = "a";
- storageUtil.expectTaskFetch(Query.taskScoped(ancestorId));
+ storageUtil.expectTaskFetch(ancestorId);
control.replay();
@@ -90,7 +89,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest {
@Test
public void testFlappingTask() {
IScheduledTask ancestor = makeFlappyTask("a");
- storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+ storageUtil.expectTaskFetch(Tasks.id(ancestor), ancestor);
long penaltyMs = 1000L;
expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs);
@@ -119,9 +118,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest {
long lastPenalty = 0L;
for (Map.Entry<IScheduledTask, Long> taskAndPenalty : ancestorsAndPenalties.entrySet()) {
- storageUtil.expectTaskFetch(
- Query.taskScoped(Tasks.id(taskAndPenalty.getKey())),
- taskAndPenalty.getKey());
+ storageUtil.expectTaskFetch(Tasks.id(taskAndPenalty.getKey()), taskAndPenalty.getKey());
expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue());
lastPenalty = taskAndPenalty.getValue();
}
@@ -137,7 +134,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest {
IScheduledTask ancestor = setEvents(
makeTask("a", KILLED),
ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L));
- storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+ storageUtil.expectTaskFetch(Tasks.id(ancestor), ancestor);
control.replay();
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 6d42689..498da78 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -202,7 +202,7 @@ public class StateManagerImplTest extends EasyMockTest {
.setTask(NON_SERVICE_CONFIG.newBuilder()));
assertEquals(
ImmutableSet.of(IScheduledTask.build(expected)),
- Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
+ Storage.Util.fetchTask(storage, taskId).asSet());
}
@Test
@@ -343,8 +343,7 @@ public class StateManagerImplTest extends EasyMockTest {
assignTask(taskId, HOST_A);
changeState(taskId, RUNNING);
changeState(taskId, FAILED);
- IScheduledTask rescheduledTask = Iterables.getOnlyElement(
- Storage.Util.fetchTasks(storage, Query.taskScoped(taskId2)));
+ IScheduledTask rescheduledTask = Storage.Util.fetchTask(storage, taskId2).get();
assertEquals(taskId, rescheduledTask.getAncestorId());
assertEquals(1, rescheduledTask.getFailureCount());
}
@@ -422,8 +421,7 @@ public class StateManagerImplTest extends EasyMockTest {
insertTask(task, 0);
assignTask(taskId, HOST_A, ImmutableMap.of("one", 80, "two", 81, "three", 82));
- IScheduledTask actual = Iterables.getOnlyElement(
- Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
+ IScheduledTask actual = Storage.Util.fetchTask(storage, taskId).get();
assertEquals(
requestedPorts,
@@ -453,8 +451,7 @@ public class StateManagerImplTest extends EasyMockTest {
assignTask(newTaskId, HOST_A, ImmutableMap.of("one", 86));
- IScheduledTask actual = Iterables.getOnlyElement(
- Storage.Util.fetchTasks(storage, Query.taskScoped(newTaskId)));
+ IScheduledTask actual = Storage.Util.fetchTask(storage, newTaskId).get();
assertEquals(ImmutableMap.of("one", 86), actual.getAssignedTask().getAssignedPorts());
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
index a6bfc7a..22a6b43 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
@@ -26,7 +26,6 @@ import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.JobConfiguration;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -114,8 +113,8 @@ public abstract class AbstractCronJobStoreTest {
saveAcceptedJob(JOB_A);
storage.write(storeProvider ->
- storeProvider.getUnsafeTaskStore().mutateTasks(
- Query.taskScoped(Tasks.id(instance)),
+ storeProvider.getUnsafeTaskStore().mutateTask(
+ Tasks.id(instance),
task -> IScheduledTask.build(task.newBuilder().setStatus(ScheduleStatus.RUNNING))));
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java
index 5a9b6c1..1ac41d1 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java
@@ -20,10 +20,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -101,6 +101,10 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
});
}
+ private Optional<IScheduledTask> fetchTask(String taskId) {
+ return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTask(taskId));
+ }
+
private Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query));
}
@@ -114,10 +118,12 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.copyOf(tasks)));
}
- private ImmutableSet<IScheduledTask> mutateTasks(
- final Query.Builder query,
- final TaskMutation mutation) {
+ private Optional<IScheduledTask> mutateTask(String taskId, TaskMutation mutation) {
+ return storage.write(
+ storeProvider -> storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+ }
+ private ImmutableSet<IScheduledTask> mutateTasks(Query.Builder query, TaskMutation mutation) {
return storage.write(
storeProvider -> storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
}
@@ -261,9 +267,7 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(Query.statusScoped(RUNNING));
- mutateTasks(
- Query.taskScoped("a"),
- task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING)));
+ mutateTask("a", task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING)));
assertQueryResults(
Query.statusScoped(RUNNING),
@@ -293,10 +297,7 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
saveTasks(TASK_A);
assertTrue(unsafeModifyInPlace(taskId, updated));
- Query.Builder query = Query.taskScoped(taskId);
- ITaskConfig stored =
- Iterables.getOnlyElement(fetchTasks(query)).getAssignedTask().getTask();
- assertEquals(updated, stored);
+ assertEquals(updated, fetchTask(taskId).get().getAssignedTask().getTask());
deleteTasks(taskId);
assertFalse(unsafeModifyInPlace(taskId, updated));
@@ -413,22 +414,22 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
assertQueryResults(Query.slaveScoped(HOST_A.getHost()));
final IScheduledTask b = setHost(a, HOST_A);
- Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)),
+ Optional<IScheduledTask> result = mutateTask(Tasks.id(a),
task -> {
assertEquals(a, task);
return b;
});
- assertEquals(ImmutableSet.of(b), result);
+ assertEquals(Optional.of(b), result);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()), b);
// Unrealistic behavior, but proving that the secondary index can handle key mutations.
final IScheduledTask c = setHost(b, HOST_B);
- Set<IScheduledTask> result2 = mutateTasks(Query.taskScoped(Tasks.id(a)),
+ Optional<IScheduledTask> result2 = mutateTask(Tasks.id(a),
task -> {
assertEquals(b, task);
return c;
});
- assertEquals(ImmutableSet.of(c), result2);
+ assertEquals(Optional.of(c), result2);
assertQueryResults(Query.slaveScoped(HOST_B.getHost()), c);
deleteTasks(Tasks.id(a));
@@ -444,12 +445,12 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a);
final IScheduledTask b = unsetHost(a);
- Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)),
+ Optional<IScheduledTask> result = mutateTask(Tasks.id(a),
task -> {
assertEquals(a, task);
return b;
});
- assertEquals(ImmutableSet.of(b), result);
+ assertEquals(Optional.of(b), result);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()));
assertQueryResults(Query.taskScoped(Tasks.id(b)), b);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 4305270..7382eca 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -78,7 +78,6 @@ import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.gen.storage.storageConstants;
import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
@@ -542,22 +541,22 @@ public class LogStorageTest extends EasyMockTest {
@Test
public void testMutateTasks() throws Exception {
- Query.Builder query = Query.taskScoped("fred");
+ String taskId = "fred";
Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
- ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.STARTING));
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
new AbstractMutationFixture() {
@Override
protected void setupExpectations() throws Exception {
storageUtil.expectWrite();
- expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
streamMatcher.expectTransaction(
- Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))))
+ Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
.andReturn(null);
}
@Override
protected void performMutations(MutableStoreProvider storeProvider) {
- assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
}
}.run();
}
@@ -588,28 +587,28 @@ public class LogStorageTest extends EasyMockTest {
@Test
public void testNestedTransactions() throws Exception {
- Query.Builder query = Query.taskScoped("fred");
+ String taskId = "fred";
Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
- ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.STARTING));
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
new AbstractMutationFixture() {
@Override
protected void setupExpectations() throws Exception {
storageUtil.expectWrite();
- expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
storageUtil.taskStore.deleteTasks(tasksToRemove);
streamMatcher.expectTransaction(
- Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))),
+ Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
Op.removeTasks(new RemoveTasks(tasksToRemove)))
.andReturn(position);
}
@Override
protected void performMutations(MutableStoreProvider storeProvider) {
- assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
logStorage.write((NoResult.Quiet)
innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
@@ -619,10 +618,10 @@ public class LogStorageTest extends EasyMockTest {
@Test
public void testSaveAndMutateTasks() throws Exception {
- Query.Builder query = Query.taskScoped("fred");
+ String taskId = "fred";
Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT));
- ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.PENDING));
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
new AbstractMutationFixture() {
@Override
@@ -631,28 +630,28 @@ public class LogStorageTest extends EasyMockTest {
storageUtil.taskStore.saveTasks(saved);
// Nested transaction with result.
- expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
// Resulting stream operation.
streamMatcher.expectTransaction(Op.saveTasks(
- new SaveTasks(IScheduledTask.toBuildersSet(mutated))))
+ new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
.andReturn(null);
}
@Override
protected void performMutations(MutableStoreProvider storeProvider) {
storeProvider.getUnsafeTaskStore().saveTasks(saved);
- assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
}
}.run();
}
@Test
public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
- Query.Builder query = Query.taskScoped("fred");
+ String taskId = "fred";
Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT));
- ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.PENDING));
+ Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
new AbstractMutationFixture() {
@Override
@@ -661,14 +660,14 @@ public class LogStorageTest extends EasyMockTest {
storageUtil.taskStore.saveTasks(saved);
// Nested transaction with result.
- expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
+ expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
// Resulting stream operation.
streamMatcher.expectTransaction(
Op.saveTasks(new SaveTasks(
ImmutableSet.<ScheduledTask>builder()
.addAll(IScheduledTask.toBuildersList(saved))
- .addAll(IScheduledTask.toBuildersList(mutated))
+ .add(mutated.get().newBuilder())
.build())))
.andReturn(position);
}
@@ -676,7 +675,7 @@ public class LogStorageTest extends EasyMockTest {
@Override
protected void performMutations(MutableStoreProvider storeProvider) {
storeProvider.getUnsafeTaskStore().saveTasks(saved);
- assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+ assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
}
}.run();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index bf344a4..21d26b3 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -13,6 +13,7 @@
*/
package org.apache.aurora.scheduler.storage.testing;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
@@ -114,6 +115,14 @@ public class StorageTestUtil {
return expect(taskStore.fetchTasks(query)).andReturn(result);
}
+ public IExpectationSetters<?> expectTaskFetch(String taskId, IScheduledTask result) {
+ return expect(taskStore.fetchTask(taskId)).andReturn(Optional.of(result));
+ }
+
+ public IExpectationSetters<?> expectTaskFetch(String taskId) {
+ return expect(taskStore.fetchTask(taskId)).andReturn(Optional.absent());
+ }
+
public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
}