You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2016/01/22 08:04:41 UTC

aurora git commit: Add storage API methods for fetching amd mutating a task by ID.

Repository: aurora
Updated Branches:
  refs/heads/master c89fecbcd -> 66a4d5fdd


Add storage API methods for fetching amd mutating a task by ID.

Reviewed at https://reviews.apache.org/r/42628/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/66a4d5fd
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/66a4d5fd
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/66a4d5fd

Branch: refs/heads/master
Commit: 66a4d5fdd3ff66facccd094e6c7523b0a2d19860
Parents: c89fecb
Author: Bill Farner <wf...@apache.org>
Authored: Thu Jan 21 23:04:35 2016 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Jan 21 23:04:35 2016 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/http/StructDump.java       | 14 ++--
 .../scheduling/RescheduleCalculator.java        |  6 +-
 .../scheduler/state/StateManagerImpl.java       | 46 +++++---------
 .../aurora/scheduler/storage/Storage.java       |  8 ++-
 .../aurora/scheduler/storage/TaskStore.java     | 26 ++++++--
 .../scheduler/storage/db/DbTaskStore.java       | 57 +++++++++++++----
 .../storage/log/WriteAheadStorage.java          | 19 ++++--
 .../scheduler/storage/mem/MemTaskStore.java     | 67 ++++++++++++++------
 .../RescheduleCalculatorImplTest.java           | 11 ++--
 .../scheduler/state/StateManagerImplTest.java   | 11 ++--
 .../storage/AbstractCronJobStoreTest.java       |  5 +-
 .../storage/AbstractTaskStoreTest.java          | 35 +++++-----
 .../scheduler/storage/log/LogStorageTest.java   | 41 ++++++------
 .../storage/testing/StorageTestUtil.java        |  9 +++
 14 files changed, 213 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
index 4fa5254..f84767a 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
@@ -23,15 +23,14 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
 
 import org.apache.aurora.common.thrift.Util;
 import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.thrift.TBase;
 
 import static java.util.Objects.requireNonNull;
@@ -73,13 +72,10 @@ public class StructDump extends JerseyTemplateServlet {
   public Response dumpJob(
       @PathParam("task") final String taskId) {
 
-    return dumpEntity("Task " + taskId, storeProvider -> {
-      // Deep copy the struct to sidestep any subclass trickery inside the storage system.
-      return Optional.fromNullable(Iterables.getOnlyElement(
-              storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
-              null)
-          .newBuilder());
-    });
+    return dumpEntity(
+        "Task " + taskId,
+        storeProvider ->
+            storeProvider.getTaskStore().fetchTask(taskId).transform(IScheduledTask::newBuilder));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
index c136d1a..4b0ef81 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
@@ -32,7 +32,6 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.util.BackoffStrategy;
 import org.apache.aurora.common.util.Random;
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -147,10 +146,7 @@ public interface RescheduleCalculator {
         return Optional.absent();
       }
 
-      Iterable<IScheduledTask> res =
-          Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId()));
-
-      return Optional.fromNullable(Iterables.getOnlyElement(res, null));
+      return Storage.Util.fetchTask(storage, task.getAncestorId());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 720b5e5..e5b2f41 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -31,7 +31,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
@@ -47,6 +46,7 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.state.SideEffect.Action;
@@ -163,18 +163,16 @@ public class StateManagerImpl implements StateManager {
   public IAssignedTask assignTask(
       MutableStoreProvider storeProvider,
       String taskId,
-      final String slaveHost,
-      final SlaveID slaveId,
-      final Map<String, Integer> assignedPorts) {
+      String slaveHost,
+      SlaveID slaveId,
+      Map<String, Integer> assignedPorts) {
 
     checkNotBlank(taskId);
     checkNotBlank(slaveHost);
     requireNonNull(slaveId);
     requireNonNull(assignedPorts);
 
-    Query.Builder query = Query.taskScoped(taskId);
-
-    storeProvider.getUnsafeTaskStore().mutateTasks(query,
+    IScheduledTask mutated = storeProvider.getUnsafeTaskStore().mutateTask(taskId,
         task -> {
           ScheduledTask builder = task.newBuilder();
           builder.getAssignedTask()
@@ -182,7 +180,7 @@ public class StateManagerImpl implements StateManager {
               .setSlaveHost(slaveHost)
               .setSlaveId(slaveId.getValue());
           return IScheduledTask.build(builder);
-        });
+        }).get();
 
     StateChangeResult changeResult = updateTaskAndExternalState(
         storeProvider.getUnsafeTaskStore(),
@@ -195,10 +193,7 @@ public class StateManagerImpl implements StateManager {
         changeResult == SUCCESS,
         "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
 
-    return Iterables.getOnlyElement(
-        Iterables.transform(
-            storeProvider.getTaskStore().fetchTasks(query),
-            IScheduledTask::getAssignedTask));
+    return mutated.getAssignedTask();
   }
 
   @VisibleForTesting
@@ -219,9 +214,7 @@ public class StateManagerImpl implements StateManager {
       ScheduleStatus targetState,
       Optional<String> transitionMessage) {
 
-    Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
-        taskStore.fetchTasks(Query.taskScoped(taskId)),
-        null));
+    Optional<IScheduledTask> task = taskStore.fetchTask(taskId);
 
     // CAS operation fails if the task does not exist, or the states don't match.
     if (casState.isPresent()
@@ -264,34 +257,32 @@ public class StateManagerImpl implements StateManager {
   private StateChangeResult updateTaskAndExternalState(
       TaskStore.Mutable taskStore,
       String taskId,
-      // Note: This argument is deliberately non-final, and should not be made final.
+      // Note: This argument should be used with caution.
       // This is because using the captured value within the storage operation below is
       // highly-risky, since it doesn't necessarily represent the value in storage.
       // As a result, it would be easy to accidentally clobber mutations.
       Optional<IScheduledTask> task,
-      final Optional<ScheduleStatus> targetState,
-      final Optional<String> transitionMessage) {
+      Optional<ScheduleStatus> targetState,
+      Optional<String> transitionMessage) {
 
     if (task.isPresent()) {
       Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId()));
     }
 
-    final List<PubsubEvent> events = Lists.newArrayList();
+    List<PubsubEvent> events = Lists.newArrayList();
 
-    final TaskStateMachine stateMachine = task.isPresent()
+    TaskStateMachine stateMachine = task.isPresent()
         ? new TaskStateMachine(task.get())
         : new TaskStateMachine(taskId);
 
     TransitionResult result = stateMachine.updateState(targetState);
-    Query.Builder query = Query.taskScoped(taskId);
 
     for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
-      Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
-          Iterables.getOnlyElement(taskStore.fetchTasks(query), null));
+      Optional<IScheduledTask> upToDateTask = taskStore.fetchTask(taskId);
 
       switch (sideEffect.getAction()) {
         case INCREMENT_FAILURES:
-          taskStore.mutateTasks(query, task1 -> IScheduledTask.build(
+          taskStore.mutateTask(taskId, task1 -> IScheduledTask.build(
               task1.newBuilder().setFailureCount(task1.getFailureCount() + 1)));
           break;
 
@@ -300,7 +291,7 @@ public class StateManagerImpl implements StateManager {
               upToDateTask.isPresent(),
               "Operation expected task " + taskId + " to be present.");
 
-          taskStore.mutateTasks(query, task1 -> {
+          Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, task1 -> {
             ScheduledTask mutableTask = task1.newBuilder();
             mutableTask.setStatus(targetState.get());
             mutableTask.addToTaskEvents(new TaskEvent()
@@ -310,10 +301,7 @@ public class StateManagerImpl implements StateManager {
                 .setScheduler(LOCAL_HOST_SUPPLIER.get()));
             return IScheduledTask.build(mutableTask);
           });
-          events.add(
-              PubsubEvent.TaskStateChange.transition(
-                  Iterables.getOnlyElement(taskStore.fetchTasks(query)),
-                  stateMachine.getPreviousState()));
+          events.add(TaskStateChange.transition(mutated.get(), stateMachine.getPreviousState()));
           break;
 
         case RESCHEDULE:

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 6109158..578bb37 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -20,6 +20,8 @@ import java.lang.annotation.Target;
 
 import javax.inject.Qualifier;
 
+import com.google.common.base.Optional;
+
 import org.apache.aurora.scheduler.base.Query.Builder;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -291,10 +293,14 @@ public interface Storage {
      * @param query Builder of the query to perform.
      * @return Tasks returned from the query.
      */
-    public static Iterable<IScheduledTask> fetchTasks(Storage storage, final Builder query) {
+    public static Iterable<IScheduledTask> fetchTasks(Storage storage, Builder query) {
       return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query));
     }
 
+    public static Optional<IScheduledTask> fetchTask(Storage storage, String taskId) {
+      return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTask(taskId));
+    }
+
     public static Iterable<IJobConfiguration> fetchCronJobs(Storage storage) {
       return storage.read(storeProvider -> storeProvider.getCronJobStore().fetchJobs());
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 62639c4..4e4f8d2 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.storage;
 import java.util.Set;
 
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 
@@ -34,8 +35,16 @@ import static com.google.common.base.CharMatcher.WHITESPACE;
 public interface TaskStore {
 
   /**
+   * Fetches a task.
+   *
+   * @param taskId ID of the task to fetch.
+   * @return The task, if it exists.
+   */
+  Optional<IScheduledTask> fetchTask(String taskId);
+
+  /**
    * Fetches a read-only view of tasks matching a query and filters. Intended for use with a
-   * {@link org.apache.aurora.scheduler.base.Query.Builder}.
+   * {@link Query.Builder}.
    *
    * @param query Builder of the query to identify tasks with.
    * @return A read-only view of matching tasks.
@@ -82,14 +91,23 @@ public interface TaskStore {
     void deleteTasks(Set<String> taskIds);
 
     /**
+     * Mutates a single task, if present.
+     *
+     * @param taskId Unique ID of the task to mutate.
+     * @param mutator The mutate operation.
+     * @return The result of the mutate operation, if performed.
+     */
+    Optional<IScheduledTask> mutateTask(
+        String taskId,
+        Function<IScheduledTask, IScheduledTask> mutator);
+
+    /**
      * Offers temporary mutable access to tasks.  If a task ID is not found, it will be silently
      * skipped, and no corresponding task will be returned.
-     * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case,
-     * and it prevents the caller from worrying about a bad query having broad impact.
      *
      * @param query Query to match tasks against.
      * @param mutator The mutate operation.
-     * @return Immutable copies of only the tasks that were mutated.
+     * @return Immutable copies <em>of only the tasks that were mutated</em>.
      */
     ImmutableSet<IScheduledTask> mutateTasks(
         Query.Builder query,

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
index d406134..43fda1d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java
@@ -79,6 +79,14 @@ class DbTaskStore implements TaskStore.Mutable {
     this.slowQueryThresholdNanos =  slowQueryThreshold.as(Time.NANOSECONDS);
   }
 
+  @Timed("db_storage_fetch_task")
+  @Override
+  public Optional<IScheduledTask> fetchTask(String taskId) {
+    requireNonNull(taskId);
+    return Optional.fromNullable(taskMapper.selectById(taskId))
+        .transform(DbScheduledTask::toImmutable);
+  }
+
   @Timed("db_storage_fetch_tasks")
   @Override
   public ImmutableSet<IScheduledTask> fetchTasks(Builder query) {
@@ -160,28 +168,50 @@ class DbTaskStore implements TaskStore.Mutable {
     }
   }
 
-  @Timed("db_storage_mutate_tasks")
-  @Override
-  public ImmutableSet<IScheduledTask> mutateTasks(
-      Builder query,
+  private Function<IScheduledTask, IScheduledTask> mutateAndSave(
       Function<IScheduledTask, IScheduledTask> mutator) {
 
-    requireNonNull(query);
-    requireNonNull(mutator);
-
-    ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
-    for (IScheduledTask original : fetchTasks(query)) {
+    return original -> {
       IScheduledTask maybeMutated = mutator.apply(original);
+      requireNonNull(maybeMutated);
       if (!original.equals(maybeMutated)) {
         Preconditions.checkState(
             Tasks.id(original).equals(Tasks.id(maybeMutated)),
             "A task's ID may not be mutated.");
         saveTasks(ImmutableSet.of(maybeMutated));
-        mutated.add(maybeMutated);
       }
-    }
+      return maybeMutated;
+    };
+  }
+
+  @Timed("db_storage_mutate_task")
+  @Override
+  public Optional<IScheduledTask> mutateTask(
+      String taskId,
+      Function<IScheduledTask, IScheduledTask> mutator) {
 
-    return mutated.build();
+    requireNonNull(taskId);
+    requireNonNull(mutator);
+
+    return fetchTask(taskId).transform(mutateAndSave(mutator));
+  }
+
+  @Timed("db_storage_mutate_tasks")
+  @Override
+  public ImmutableSet<IScheduledTask> mutateTasks(
+      Builder query,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    requireNonNull(query);
+    requireNonNull(mutator);
+
+    Function<IScheduledTask, IScheduledTask> mutateFunction = mutateAndSave(mutator);
+    Iterable<Optional<IScheduledTask>> mutations = matches(query)
+        .transform(original -> {
+          IScheduledTask mutateResult = mutateFunction.apply(original);
+          return original.equals(mutateResult) ? Optional.absent() : Optional.of(mutateResult);
+        });
+    return ImmutableSet.copyOf(Optional.presentInstances(mutations));
   }
 
   @Timed("db_storage_unsafe_modify_in_place")
@@ -189,8 +219,7 @@ class DbTaskStore implements TaskStore.Mutable {
   public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
     checkNotNull(taskId);
     checkNotNull(taskConfiguration);
-    Optional<IScheduledTask> task =
-        Optional.fromNullable(Iterables.getOnlyElement(fetchTasks(Query.taskScoped(taskId)), null));
+    Optional<IScheduledTask> task = fetchTask(taskId);
     if (task.isPresent()) {
       deleteTasks(ImmutableSet.of(taskId));
       ScheduledTask builder = task.get().newBuilder();

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index c44ff47..7283531 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -193,12 +193,21 @@ class WriteAheadStorage extends WriteAheadStorageForwarder implements
   }
 
   @Override
-  public ImmutableSet<IScheduledTask> mutateTasks(
-      final Query.Builder query,
-      final Function<IScheduledTask, IScheduledTask> mutator) {
+  public Optional<IScheduledTask> mutateTask(
+      String taskId,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
+    log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
+    write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
 
-    requireNonNull(query);
-    requireNonNull(mutator);
+    return mutated;
+  }
+
+  @Override
+  public ImmutableSet<IScheduledTask> mutateTasks(
+      Query.Builder query,
+      Function<IScheduledTask, IScheduledTask> mutator) {
 
     ImmutableSet<IScheduledTask> mutated = taskStore.mutateTasks(query, mutator);
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index c55dcc9..8fd024a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -124,13 +124,20 @@ class MemTaskStore implements TaskStore.Mutable {
     taskQueriesAll = statsProvider.makeCounter("task_queries_all");
   }
 
+  @Timed("mem_storage_fetch_task")
+  @Override
+  public Optional<IScheduledTask> fetchTask(String taskId) {
+    requireNonNull(taskId);
+    return Optional.fromNullable(tasks.get(taskId)).transform(t -> t.storedTask);
+  }
+
   @Timed("mem_storage_fetch_tasks")
   @Override
   public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query) {
     requireNonNull(query);
 
     long start = System.nanoTime();
-    ImmutableSet<IScheduledTask> result = matches(query).transform(TO_SCHEDULED).toSet();
+    ImmutableSet<IScheduledTask> result = matches(query).toSet();
     long durationNanos = System.nanoTime() - start;
     boolean infoLevel = durationNanos >= slowQueryThresholdNanos;
     long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
@@ -196,32 +203,50 @@ class MemTaskStore implements TaskStore.Mutable {
     }
   }
 
-  @Timed("mem_storage_mutate_tasks")
-  @Override
-  public ImmutableSet<IScheduledTask> mutateTasks(
-      Query.Builder query,
+  private Function<IScheduledTask, IScheduledTask> mutateAndSave(
       Function<IScheduledTask, IScheduledTask> mutator) {
 
-    requireNonNull(query);
-    requireNonNull(mutator);
-
-    ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
-    for (Task original : matches(query).toList()) {
-      IScheduledTask maybeMutated = mutator.apply(original.storedTask);
-      if (!original.storedTask.equals(maybeMutated)) {
+    return original -> {
+      IScheduledTask maybeMutated = mutator.apply(original);
+      requireNonNull(maybeMutated);
+      if (!original.equals(maybeMutated)) {
         Preconditions.checkState(
-            Tasks.id(original.storedTask).equals(Tasks.id(maybeMutated)),
+            Tasks.id(original).equals(Tasks.id(maybeMutated)),
             "A task's ID may not be mutated.");
         tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
         for (SecondaryIndex<?> index : secondaryIndices) {
-          index.replace(original.storedTask, maybeMutated);
+          index.replace(original, maybeMutated);
         }
-
-        mutated.add(maybeMutated);
       }
-    }
+      return maybeMutated;
+    };
+  }
 
-    return mutated.build();
+  @Timed("mem_storage_mutate_task")
+  @Override
+  public Optional<IScheduledTask> mutateTask(
+      String taskId,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    return fetchTask(taskId).transform(mutateAndSave(mutator));
+  }
+
+  @Timed("mem_storage_mutate_tasks")
+  @Override
+  public ImmutableSet<IScheduledTask> mutateTasks(
+      Query.Builder query,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    requireNonNull(query);
+    requireNonNull(mutator);
+
+    Function<IScheduledTask, IScheduledTask> mutateFunction = mutateAndSave(mutator);
+    Iterable<Optional<IScheduledTask>> mutations = matches(query)
+        .transform(original -> {
+          IScheduledTask mutateResult = mutateFunction.apply(original);
+          return original.equals(mutateResult) ? Optional.absent() : Optional.of(mutateResult);
+        });
+    return ImmutableSet.copyOf(Optional.presentInstances(mutations));
   }
 
   @Timed("mem_storage_unsafe_modify_in_place")
@@ -259,7 +284,7 @@ class MemTaskStore implements TaskStore.Mutable {
         .toList();
   }
 
-  private FluentIterable<Task> matches(Query.Builder query) {
+  private FluentIterable<IScheduledTask> matches(Query.Builder query) {
     // Apply the query against the working set.
     Optional<? extends Iterable<Task>> from = Optional.absent();
     if (query.get().isSetTaskIds()) {
@@ -284,7 +309,9 @@ class MemTaskStore implements TaskStore.Mutable {
       }
     }
 
-    return FluentIterable.from(from.get()).filter(queryFilter(query));
+    return FluentIterable.from(from.get())
+        .filter(queryFilter(query))
+        .transform(TO_SCHEDULED);
   }
 
   private static final Function<Task, IScheduledTask> TO_SCHEDULED = task -> task.storedTask;

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
index b380f21..9d21dcd 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
@@ -29,7 +29,6 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -78,7 +77,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest {
   @Test
   public void testNoPenaltyDeletedAncestor() {
     String ancestorId = "a";
-    storageUtil.expectTaskFetch(Query.taskScoped(ancestorId));
+    storageUtil.expectTaskFetch(ancestorId);
 
     control.replay();
 
@@ -90,7 +89,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest {
   @Test
   public void testFlappingTask() {
     IScheduledTask ancestor = makeFlappyTask("a");
-    storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+    storageUtil.expectTaskFetch(Tasks.id(ancestor), ancestor);
     long penaltyMs = 1000L;
     expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs);
 
@@ -119,9 +118,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest {
 
     long lastPenalty = 0L;
     for (Map.Entry<IScheduledTask, Long> taskAndPenalty : ancestorsAndPenalties.entrySet()) {
-      storageUtil.expectTaskFetch(
-          Query.taskScoped(Tasks.id(taskAndPenalty.getKey())),
-          taskAndPenalty.getKey());
+      storageUtil.expectTaskFetch(Tasks.id(taskAndPenalty.getKey()), taskAndPenalty.getKey());
       expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue());
       lastPenalty = taskAndPenalty.getValue();
     }
@@ -137,7 +134,7 @@ public class RescheduleCalculatorImplTest extends EasyMockTest {
     IScheduledTask ancestor = setEvents(
         makeTask("a", KILLED),
         ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L));
-    storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+    storageUtil.expectTaskFetch(Tasks.id(ancestor), ancestor);
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 6d42689..498da78 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -202,7 +202,7 @@ public class StateManagerImplTest extends EasyMockTest {
             .setTask(NON_SERVICE_CONFIG.newBuilder()));
     assertEquals(
         ImmutableSet.of(IScheduledTask.build(expected)),
-        Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
+        Storage.Util.fetchTask(storage, taskId).asSet());
   }
 
   @Test
@@ -343,8 +343,7 @@ public class StateManagerImplTest extends EasyMockTest {
     assignTask(taskId, HOST_A);
     changeState(taskId, RUNNING);
     changeState(taskId, FAILED);
-    IScheduledTask rescheduledTask = Iterables.getOnlyElement(
-        Storage.Util.fetchTasks(storage, Query.taskScoped(taskId2)));
+    IScheduledTask rescheduledTask = Storage.Util.fetchTask(storage, taskId2).get();
     assertEquals(taskId, rescheduledTask.getAncestorId());
     assertEquals(1, rescheduledTask.getFailureCount());
   }
@@ -422,8 +421,7 @@ public class StateManagerImplTest extends EasyMockTest {
     insertTask(task, 0);
     assignTask(taskId, HOST_A, ImmutableMap.of("one", 80, "two", 81, "three", 82));
 
-    IScheduledTask actual = Iterables.getOnlyElement(
-        Storage.Util.fetchTasks(storage, Query.taskScoped(taskId)));
+    IScheduledTask actual = Storage.Util.fetchTask(storage, taskId).get();
 
     assertEquals(
         requestedPorts,
@@ -453,8 +451,7 @@ public class StateManagerImplTest extends EasyMockTest {
 
     assignTask(newTaskId, HOST_A, ImmutableMap.of("one", 86));
 
-    IScheduledTask actual = Iterables.getOnlyElement(
-        Storage.Util.fetchTasks(storage, Query.taskScoped(newTaskId)));
+    IScheduledTask actual = Storage.Util.fetchTask(storage, newTaskId).get();
 
     assertEquals(ImmutableMap.of("one", 86), actual.getAssignedTask().getAssignedPorts());
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
index a6bfc7a..22a6b43 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java
@@ -26,7 +26,6 @@ import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -114,8 +113,8 @@ public abstract class AbstractCronJobStoreTest {
     saveAcceptedJob(JOB_A);
 
     storage.write(storeProvider ->
-        storeProvider.getUnsafeTaskStore().mutateTasks(
-            Query.taskScoped(Tasks.id(instance)),
+        storeProvider.getUnsafeTaskStore().mutateTask(
+            Tasks.id(instance),
             task -> IScheduledTask.build(task.newBuilder().setStatus(ScheduleStatus.RUNNING))));
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java
index 5a9b6c1..1ac41d1 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractTaskStoreTest.java
@@ -20,10 +20,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -101,6 +101,10 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
     });
   }
 
+  private Optional<IScheduledTask> fetchTask(String taskId) {
+    return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTask(taskId));
+  }
+
   private Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
     return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query));
   }
@@ -114,10 +118,12 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
         storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.copyOf(tasks)));
   }
 
-  private ImmutableSet<IScheduledTask> mutateTasks(
-      final Query.Builder query,
-      final TaskMutation mutation) {
+  private Optional<IScheduledTask> mutateTask(String taskId, TaskMutation mutation) {
+    return storage.write(
+        storeProvider -> storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
+  }
 
+  private ImmutableSet<IScheduledTask> mutateTasks(Query.Builder query, TaskMutation mutation) {
     return storage.write(
         storeProvider -> storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
   }
@@ -261,9 +267,7 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
     saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
     assertQueryResults(Query.statusScoped(RUNNING));
 
-    mutateTasks(
-        Query.taskScoped("a"),
-        task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING)));
+    mutateTask("a", task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING)));
 
     assertQueryResults(
         Query.statusScoped(RUNNING),
@@ -293,10 +297,7 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
 
     saveTasks(TASK_A);
     assertTrue(unsafeModifyInPlace(taskId, updated));
-    Query.Builder query = Query.taskScoped(taskId);
-    ITaskConfig stored =
-        Iterables.getOnlyElement(fetchTasks(query)).getAssignedTask().getTask();
-    assertEquals(updated, stored);
+    assertEquals(updated, fetchTask(taskId).get().getAssignedTask().getTask());
 
     deleteTasks(taskId);
     assertFalse(unsafeModifyInPlace(taskId, updated));
@@ -413,22 +414,22 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
     assertQueryResults(Query.slaveScoped(HOST_A.getHost()));
 
     final IScheduledTask b = setHost(a, HOST_A);
-    Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)),
+    Optional<IScheduledTask> result = mutateTask(Tasks.id(a),
         task -> {
           assertEquals(a, task);
           return b;
         });
-    assertEquals(ImmutableSet.of(b), result);
+    assertEquals(Optional.of(b), result);
     assertQueryResults(Query.slaveScoped(HOST_A.getHost()), b);
 
     // Unrealistic behavior, but proving that the secondary index can handle key mutations.
     final IScheduledTask c = setHost(b, HOST_B);
-    Set<IScheduledTask> result2 = mutateTasks(Query.taskScoped(Tasks.id(a)),
+    Optional<IScheduledTask> result2 = mutateTask(Tasks.id(a),
         task -> {
           assertEquals(b, task);
           return c;
         });
-    assertEquals(ImmutableSet.of(c), result2);
+    assertEquals(Optional.of(c), result2);
     assertQueryResults(Query.slaveScoped(HOST_B.getHost()), c);
 
     deleteTasks(Tasks.id(a));
@@ -444,12 +445,12 @@ public abstract class AbstractTaskStoreTest extends TearDownTestCase {
     assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a);
 
     final IScheduledTask b = unsetHost(a);
-    Set<IScheduledTask> result = mutateTasks(Query.taskScoped(Tasks.id(a)),
+    Optional<IScheduledTask> result = mutateTask(Tasks.id(a),
         task -> {
           assertEquals(a, task);
           return b;
         });
-    assertEquals(ImmutableSet.of(b), result);
+    assertEquals(Optional.of(b), result);
     assertQueryResults(Query.slaveScoped(HOST_A.getHost()));
     assertQueryResults(Query.taskScoped(Tasks.id(b)), b);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 4305270..7382eca 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -78,7 +78,6 @@ import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
 import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -542,22 +541,22 @@ public class LogStorageTest extends EasyMockTest {
 
   @Test
   public void testMutateTasks() throws Exception {
-    Query.Builder query = Query.taskScoped("fred");
+    String taskId = "fred";
     Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.STARTING));
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
     new AbstractMutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
         storageUtil.expectWrite();
-        expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
         streamMatcher.expectTransaction(
-            Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))))
+            Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
             .andReturn(null);
       }
 
       @Override
       protected void performMutations(MutableStoreProvider storeProvider) {
-        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
       }
     }.run();
   }
@@ -588,28 +587,28 @@ public class LogStorageTest extends EasyMockTest {
 
   @Test
   public void testNestedTransactions() throws Exception {
-    Query.Builder query = Query.taskScoped("fred");
+    String taskId = "fred";
     Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.STARTING));
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.STARTING));
     ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
 
     new AbstractMutationFixture() {
       @Override
       protected void setupExpectations() throws Exception {
         storageUtil.expectWrite();
-        expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
 
         storageUtil.taskStore.deleteTasks(tasksToRemove);
 
         streamMatcher.expectTransaction(
-            Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))),
+            Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
             Op.removeTasks(new RemoveTasks(tasksToRemove)))
             .andReturn(position);
       }
 
       @Override
       protected void performMutations(MutableStoreProvider storeProvider) {
-        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
 
         logStorage.write((NoResult.Quiet)
             innerProvider -> innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
@@ -619,10 +618,10 @@ public class LogStorageTest extends EasyMockTest {
 
   @Test
   public void testSaveAndMutateTasks() throws Exception {
-    Query.Builder query = Query.taskScoped("fred");
+    String taskId = "fred";
     Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
     Set<IScheduledTask> saved = ImmutableSet.of(task("a", ScheduleStatus.INIT));
-    ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.PENDING));
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
 
     new AbstractMutationFixture() {
       @Override
@@ -631,28 +630,28 @@ public class LogStorageTest extends EasyMockTest {
         storageUtil.taskStore.saveTasks(saved);
 
         // Nested transaction with result.
-        expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
 
         // Resulting stream operation.
         streamMatcher.expectTransaction(Op.saveTasks(
-            new SaveTasks(IScheduledTask.toBuildersSet(mutated))))
+            new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
             .andReturn(null);
       }
 
       @Override
       protected void performMutations(MutableStoreProvider storeProvider) {
         storeProvider.getUnsafeTaskStore().saveTasks(saved);
-        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
       }
     }.run();
   }
 
   @Test
   public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
-    Query.Builder query = Query.taskScoped("fred");
+    String taskId = "fred";
     Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
     Set<IScheduledTask> saved = ImmutableSet.of(task("b", ScheduleStatus.INIT));
-    ImmutableSet<IScheduledTask> mutated = ImmutableSet.of(task("a", ScheduleStatus.PENDING));
+    Optional<IScheduledTask> mutated = Optional.of(task("a", ScheduleStatus.PENDING));
 
     new AbstractMutationFixture() {
       @Override
@@ -661,14 +660,14 @@ public class LogStorageTest extends EasyMockTest {
         storageUtil.taskStore.saveTasks(saved);
 
         // Nested transaction with result.
-        expect(storageUtil.taskStore.mutateTasks(query, mutation)).andReturn(mutated);
+        expect(storageUtil.taskStore.mutateTask(taskId, mutation)).andReturn(mutated);
 
         // Resulting stream operation.
         streamMatcher.expectTransaction(
             Op.saveTasks(new SaveTasks(
                 ImmutableSet.<ScheduledTask>builder()
                     .addAll(IScheduledTask.toBuildersList(saved))
-                    .addAll(IScheduledTask.toBuildersList(mutated))
+                    .add(mutated.get().newBuilder())
                     .build())))
             .andReturn(position);
       }
@@ -676,7 +675,7 @@ public class LogStorageTest extends EasyMockTest {
       @Override
       protected void performMutations(MutableStoreProvider storeProvider) {
         storeProvider.getUnsafeTaskStore().saveTasks(saved);
-        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTasks(query, mutation));
+        assertEquals(mutated, storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
       }
     }.run();
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/66a4d5fd/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index bf344a4..21d26b3 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage.testing;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
@@ -114,6 +115,14 @@ public class StorageTestUtil {
     return expect(taskStore.fetchTasks(query)).andReturn(result);
   }
 
+  public IExpectationSetters<?> expectTaskFetch(String taskId, IScheduledTask result) {
+    return expect(taskStore.fetchTask(taskId)).andReturn(Optional.of(result));
+  }
+
+  public IExpectationSetters<?> expectTaskFetch(String taskId) {
+    return expect(taskStore.fetchTask(taskId)).andReturn(Optional.absent());
+  }
+
   public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
     return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
   }