You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/11/29 00:59:13 UTC

[2/2] aurora git commit: Add RemoveJobUpdates log Op, slim JobUpdateStore API

Add RemoveJobUpdates log Op, slim JobUpdateStore API

JobUpdateStore historically had granular APIs in the storage layer to
minimize unnecessary use of 'expensive' database objects.  The
in-memory store makes these 'free', so moving business logic out of
the storage layer is now feasible for performance and pragmatic.

This patch also introduces the `RemoveJobUpdates` log `Op`, and
`PruneJobUpdateHistory` is now ignored.  In a future release (and possibly
before, with a feature flag), the scheduler will write `RemoveJobUpdates`
to the log.

LogStorage has always had the fundamental expectation that `Op`s are
idempotent.  The job update event `Op`s arguably violate this requirement, but
at minimum, explicit removal of updates is necessary for idempotency.

>From LogStorage.java:

    This design implies that all mutations must be idempotent and free from
    constraint and thus replayable over newer operations when recovering
    from an old checkpoint.

Reviewed at https://reviews.apache.org/r/63884/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/284f40f5
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/284f40f5
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/284f40f5

Branch: refs/heads/master
Commit: 284f40f5e36c70114e6229fcb93e3b203d2f1120
Parents: 80139da
Author: Bill Farner <wf...@apache.org>
Authored: Tue Nov 28 16:58:47 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Nov 28 16:58:47 2017 -0800

----------------------------------------------------------------------
 .../thrift/org/apache/aurora/gen/storage.thrift |   5 +
 .../org/apache/aurora/benchmark/JobUpdates.java |   2 +-
 .../aurora/benchmark/UpdateStoreBenchmarks.java |  12 +-
 .../pruning/JobUpdateHistoryPruner.java         |  94 ++-
 .../aurora/scheduler/pruning/PruningModule.java |  11 +-
 .../scheduler/pruning/TaskHistoryPruner.java    |   8 +-
 .../aurora/scheduler/quota/QuotaManager.java    |  30 +-
 .../scheduler/storage/JobUpdateStore.java       |  83 +--
 .../scheduler/storage/log/LogStorage.java       |  10 +-
 .../storage/log/SnapshotStoreImpl.java          |   4 +-
 .../storage/log/WriteAheadStorage.java          |  55 +-
 .../storage/mem/MemJobUpdateStore.java          | 158 +----
 .../scheduler/thrift/ReadOnlySchedulerImpl.java |  21 +-
 .../updater/JobUpdateControllerImpl.java        |  84 ++-
 .../pruning/JobUpdateHistoryPrunerTest.java     | 170 +++--
 .../pruning/TaskHistoryPrunerTest.java          |   4 +-
 .../scheduler/quota/QuotaManagerImplTest.java   | 131 ++--
 .../storage/AbstractJobUpdateStoreTest.java     | 711 +++++--------------
 .../scheduler/storage/log/LogStorageTest.java   |  30 +-
 .../storage/log/WriteAheadStorageTest.java      |  28 +-
 .../thrift/ReadOnlySchedulerImplTest.java       |  23 +-
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   8 +-
 22 files changed, 623 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
index c692a5f..2210497 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -58,6 +58,10 @@ struct SaveJobUpdate {
   // 2: deleted
 }
 
+struct RemoveJobUpdates {
+  1: set<api.JobUpdateKey> keys
+}
+
 struct StoredJobUpdateDetails {
   1: api.JobUpdateDetails details
   // 2: deleted
@@ -94,6 +98,7 @@ union Op {
   15: SaveJobUpdateEvent saveJobUpdateEvent
   16: SaveJobInstanceUpdateEvent saveJobInstanceUpdateEvent
   17: PruneJobUpdateHistory pruneJobUpdateHistory
+  18: RemoveJobUpdates removeJobUpdate
 }
 
 // The current schema version ID.  This should be incremented each time the

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
index a5d1894..7557301 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/JobUpdates.java
@@ -63,7 +63,7 @@ final class JobUpdates {
     ImmutableSet.Builder<IJobUpdateKey> keyBuilder = ImmutableSet.builder();
     storage.write((Storage.MutateWork.NoResult.Quiet) store -> {
       JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
-      updateStore.deleteAllUpdatesAndEvents();
+      updateStore.deleteAllUpdates();
       for (IJobUpdateDetails details : updates) {
         IJobUpdateKey key = details.getUpdate().getSummary().getKey();
         keyBuilder.add(key);

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
index c98c514..e41db20 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/UpdateStoreBenchmarks.java
@@ -67,13 +67,13 @@ public class UpdateStoreBenchmarks {
     @TearDown(Level.Iteration)
     public void tearDownIteration() {
       storage.write((NoResult.Quiet) storeProvider -> {
-        storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+        storeProvider.getJobUpdateStore().deleteAllUpdates();
       });
     }
 
     @Benchmark
     public IJobUpdateDetails run() throws TException {
-      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdateDetails(
+      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
           Iterables.getOnlyElement(keys)).get());
     }
   }
@@ -106,13 +106,13 @@ public class UpdateStoreBenchmarks {
     @TearDown(Level.Iteration)
     public void tearDownIteration() {
       storage.write((NoResult.Quiet) storeProvider -> {
-        storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+        storeProvider.getJobUpdateStore().deleteAllUpdates();
       });
     }
 
     @Benchmark
     public IJobUpdateDetails run() throws TException {
-      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdateDetails(
+      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
           Iterables.getOnlyElement(keys)).get());
     }
   }
@@ -145,13 +145,13 @@ public class UpdateStoreBenchmarks {
     @TearDown(Level.Iteration)
     public void tearDownIteration() {
       storage.write((NoResult.Quiet) storeProvider -> {
-        storeProvider.getJobUpdateStore().deleteAllUpdatesAndEvents();
+        storeProvider.getJobUpdateStore().deleteAllUpdates();
       });
     }
 
     @Benchmark
     public IJobUpdateDetails run() throws TException {
-      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdateDetails(
+      return storage.read(store -> store.getJobUpdateStore().fetchJobUpdate(
           Iterables.getOnlyElement(keys)).get());
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
index b2768d9..05ada3c 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
@@ -13,39 +13,51 @@
  */
 package org.apache.aurora.scheduler.pruning;
 
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
-import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.util.concurrent.AbstractScheduledService;
 
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.storage.JobUpdateStore.TERMINAL_STATES;
+
 /**
  * Prunes per-job update history on a periodic basis.
  */
-class JobUpdateHistoryPruner extends AbstractIdleService {
+class JobUpdateHistoryPruner extends AbstractScheduledService {
   private static final Logger LOG = LoggerFactory.getLogger(JobUpdateHistoryPruner.class);
   @VisibleForTesting
   static final String JOB_UPDATES_PRUNED = "job_updates_pruned";
 
   private final Clock clock;
-  private final ScheduledExecutorService executor;
   private final Storage storage;
   private final HistoryPrunerSettings settings;
   private final AtomicLong prunedUpdatesCount;
@@ -69,38 +81,80 @@ class JobUpdateHistoryPruner extends AbstractIdleService {
   @Inject
   JobUpdateHistoryPruner(
       Clock clock,
-      ScheduledExecutorService executor,
       Storage storage,
       HistoryPrunerSettings settings,
       StatsProvider statsProvider) {
 
     this.clock = requireNonNull(clock);
-    this.executor = requireNonNull(executor);
     this.storage = requireNonNull(storage);
     this.settings = requireNonNull(settings);
     this.prunedUpdatesCount = statsProvider.makeCounter(JOB_UPDATES_PRUNED);
   }
 
   @Override
-  protected void startUp() {
-    executor.scheduleAtFixedRate(
-        () -> storage.write((NoResult.Quiet) storeProvider -> {
-          Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
-              settings.maxUpdatesPerJob,
-              clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
-
-          prunedUpdatesCount.addAndGet(prunedUpdates.size());
-          LOG.info(prunedUpdates.isEmpty()
-              ? "No job update history to prune."
-              : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates));
-        }),
+  protected Scheduler scheduler() {
+    return Scheduler.newFixedDelaySchedule(
         settings.pruneInterval.as(Time.MILLISECONDS),
         settings.pruneInterval.as(Time.MILLISECONDS),
         TimeUnit.MILLISECONDS);
   }
 
+  @VisibleForTesting
+  void runForTest() {
+    runOneIteration();
+  }
+
+  @Timed("job_update_store_prune_history")
   @Override
-  protected void shutDown() {
-    // Nothing to do - await VM shutdown.
+  protected void runOneIteration() {
+    storage.write((NoResult.Quiet) storeProvider -> {
+
+      List<IJobUpdateSummary> completedUpdates = storeProvider.getJobUpdateStore()
+          .fetchJobUpdates(IJobUpdateQuery.build(
+              new JobUpdateQuery().setUpdateStatuses(TERMINAL_STATES)))
+          .stream()
+          .map(u -> u.getUpdate().getSummary())
+          .collect(Collectors.toList());
+
+      long cutoff = clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS);
+      Predicate<IJobUpdateSummary> expiredFilter =
+          s -> s.getState().getCreatedTimestampMs() < cutoff;
+
+      ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();
+
+      // Gather updates based on time threshold.
+      pruneBuilder.addAll(completedUpdates
+          .stream()
+          .filter(expiredFilter)
+          .map(IJobUpdateSummary::getKey)
+          .collect(Collectors.toList()));
+
+      Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
+          // Avoid counting to-be-removed expired updates.
+          completedUpdates.stream().filter(expiredFilter.negate()).iterator(),
+          s -> s.getKey().getJob());
+
+      updatesByJob.asMap().values().forEach(updates -> {
+        if (updates.size() > settings.maxUpdatesPerJob) {
+          Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
+              .onResultOf(s -> s.getState().getCreatedTimestampMs());
+          pruneBuilder.addAll(creationOrder
+              .leastOf(updates, updates.size() - settings.maxUpdatesPerJob)
+              .stream()
+              .map(IJobUpdateSummary::getKey)
+              .iterator());
+        }
+      });
+
+      Set<IJobUpdateKey> pruned = pruneBuilder.build();
+      if (!pruned.isEmpty()) {
+        storeProvider.getJobUpdateStore().removeJobUpdates(pruned);
+      }
+
+      prunedUpdatesCount.addAndGet(pruned.size());
+      LOG.info(pruned.isEmpty()
+          ? "No job update history to prune."
+          : "Pruned job update history: " + Joiner.on(",").join(pruned));
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
index 4433b96..0ed22d9 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
@@ -27,7 +27,6 @@ import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,11 +78,11 @@ public class PruningModule extends AbstractModule {
       protected void configure() {
         // TODO(ksweeney): Create a configuration validator module so this can be injected.
         // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
-        bind(HistoryPrunnerSettings.class).toInstance(new HistoryPrunnerSettings(
-            options.historyPruneThreshold,
-            options.historyMinRetentionThreshold,
-            options.historyMaxPerJobThreshold
-        ));
+        bind(TaskHistoryPruner.HistoryPrunerSettings.class).toInstance(
+            new TaskHistoryPruner.HistoryPrunerSettings(
+                options.historyPruneThreshold,
+                options.historyMinRetentionThreshold,
+                options.historyMaxPerJobThreshold));
 
         bind(TaskHistoryPruner.class).in(Singleton.class);
         expose(TaskHistoryPruner.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 3cafbc2..9aa51c3 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -65,7 +65,7 @@ public class TaskHistoryPruner implements EventSubscriber {
   private final ScheduledExecutorService executor;
   private final StateManager stateManager;
   private final Clock clock;
-  private final HistoryPrunnerSettings settings;
+  private final HistoryPrunerSettings settings;
   private final Storage storage;
   private final Lifecycle lifecycle;
   private final TaskEventBatchWorker batchWorker;
@@ -79,12 +79,12 @@ public class TaskHistoryPruner implements EventSubscriber {
     }
   };
 
-  static class HistoryPrunnerSettings {
+  static class HistoryPrunerSettings {
     private final long pruneThresholdMillis;
     private final long minRetentionThresholdMillis;
     private final int perJobHistoryGoal;
 
-    HistoryPrunnerSettings(
+    HistoryPrunerSettings(
         Amount<Long, Time> inactivePruneThreshold,
         Amount<Long, Time> minRetentionThreshold,
         int perJobHistoryGoal) {
@@ -100,7 +100,7 @@ public class TaskHistoryPruner implements EventSubscriber {
       @AsyncExecutor ScheduledExecutorService executor,
       StateManager stateManager,
       Clock clock,
-      HistoryPrunnerSettings settings,
+      HistoryPrunerSettings settings,
       Storage storage,
       Lifecycle lifecycle,
       TaskEventBatchWorker batchWorker,

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 7f8c66c..64ad12b 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.quota;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -26,7 +27,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.RangeSet;
 
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -48,7 +47,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -274,8 +272,13 @@ public interface QuotaManager {
           .from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()))
           .transform(IScheduledTask::getAssignedTask);
 
-      Map<IJobKey, IJobUpdateInstructions> updates = Maps.newHashMap(
-          fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role));
+      // Relies on the invariant of at-most-one active update per job.
+      Map<IJobKey, IJobUpdateInstructions> updates = storeProvider.getJobUpdateStore()
+          .fetchJobUpdates(updateQuery(role))
+          .stream()
+          .collect(Collectors.toMap(
+              u -> u.getUpdate().getSummary().getKey().getJob(),
+              u -> u.getUpdate().getInstructions()));
 
       // Mix in a requested job update (if present) to correctly calculate consumption.
       // This would be an update that is not saved in the store yet (i.e. the one quota is
@@ -398,20 +401,6 @@ public interface QuotaManager {
       };
     }
 
-    private static Map<IJobKey, IJobUpdateInstructions> fetchActiveJobUpdates(
-        final JobUpdateStore jobUpdateStore,
-        String role) {
-
-      Function<IJobUpdateSummary, IJobUpdate> fetchUpdate =
-          summary -> jobUpdateStore.fetchJobUpdate(summary.getKey()).get();
-
-      return Maps.transformValues(
-          FluentIterable.from(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role)))
-              .transform(fetchUpdate)
-              .uniqueIndex(UPDATE_TO_JOB_KEY),
-          IJobUpdate::getInstructions);
-    }
-
     @VisibleForTesting
     static IJobUpdateQuery updateQuery(String role) {
       return IJobUpdateQuery.build(new JobUpdateQuery()
@@ -474,9 +463,6 @@ public interface QuotaManager {
       return addAll(Iterables.transform(tasks, QUOTA_RESOURCES));
     }
 
-    private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
-        input -> input.getSummary().getKey().getJob();
-
     private static int getUpdateInstanceCount(Set<IRange> ranges) {
       int instanceCount = 0;
       for (IRange range : ranges) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index b3d906b..6b91d97 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -19,15 +19,14 @@ import java.util.Set;
 
 import com.google.common.base.Optional;
 
+import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 
 import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
 import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
@@ -48,21 +47,15 @@ public interface JobUpdateStore {
       ERROR
   );
 
-  /**
-   * Fetches a read-only view of job update summaries.
-   *
-   * @param query Query to identify job update summaries with.
-   * @return A read-only view of job update summaries.
-   */
-  List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query);
+  IJobUpdateQuery MATCH_ALL = IJobUpdateQuery.build(new JobUpdateQuery());
 
   /**
    * Fetches a read-only view of job update details matching the {@code query}.
    *
    * @param query Query to identify job update details with.
-   * @return A read-only list view of job update details matching the query.
+   * @return A read-only view of job update details matching the query.
    */
-  List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query);
+  List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query);
 
   /**
    * Fetches a read-only view of job update details.
@@ -70,57 +63,12 @@ public interface JobUpdateStore {
    * @param key Update identifier.
    * @return A read-only view of job update details.
    */
-  Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key);
-
-  /**
-   * Fetches a read-only view of a job update.
-   *
-   * @param key Update identifier.
-   * @return A read-only view of job update.
-   */
-  Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key);
-
-  /**
-   * Fetches a read-only view of the instructions for a job update.
-   *
-   * @param key Update identifier.
-   * @return A read-only view of job update instructions.
-   */
-  Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key);
-
-  /**
-   * Fetches a read-only view of all job update details available in the store.
-   * TODO(wfarner): Generate immutable wrappers for storage.thrift structs, use an immutable object
-   *                here.
-   *
-   * @return A read-only view of all job update details.
-   */
-  Set<IJobUpdateDetails> fetchAllJobUpdateDetails();
-  /**
-   * Fetches the events that have affected an instance within a job update.
-   *
-   * @param key Update identifier.
-   * @param instanceId Instance to fetch events for.
-   * @return Instance events in {@code key} that affected {@code instanceId}.
-   */
-  List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId);
+  Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key);
 
   interface Mutable extends JobUpdateStore {
 
     /**
-     * Saves a new job update.
-     *
-     * <p>
-     * Note: This call must be followed by the
-     * {@link #saveJobUpdateEvent(IJobUpdateKey, IJobUpdateEvent)} before fetching a saved update as
-     * it does not save the following required fields:
-     * <ul>
-     *   <li>{@link org.apache.aurora.gen.JobUpdateState#status}</li>
-     *   <li>{@link org.apache.aurora.gen.JobUpdateState#createdTimestampMs}</li>
-     *   <li>{@link org.apache.aurora.gen.JobUpdateState#lastModifiedTimestampMs}</li>
-     * </ul>
-     * The above fields are auto-populated from the update events and any attempt to fetch an update
-     * without having at least one {@link IJobUpdateEvent} present in the store will return empty.
+     * Saves a job update.
      *
      * @param update Update to save.
      */
@@ -143,22 +91,15 @@ public interface JobUpdateStore {
     void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event);
 
     /**
-     * Deletes all updates and update events from the store.
+     * Deletes job updates.
+     *
+     * @param keys Keys of the updates to delete.
      */
-    void deleteAllUpdatesAndEvents();
+    void removeJobUpdates(Set<IJobUpdateKey> keys);
 
     /**
-     * Prunes (deletes) old completed updates and events from the store.
-     * <p>
-     * At least {@code perJobRetainCount} last completed updates that completed less than
-     * {@code historyPruneThreshold} ago will be kept for every job.
-     *
-     * @param perJobRetainCount Number of completed updates to retain per job.
-     * @param historyPruneThresholdMs Earliest timestamp in the past to retain history.
-     *                                Any completed updates created before this timestamp
-     *                                will be pruned.
-     * @return Set of pruned update keys.
+     * Deletes all updates from the store.
      */
-    Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs);
+    void deleteAllUpdates();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 3ce2c7f..07b4bdb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -365,9 +365,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
               IJobUpdateKey.build(event.getKey()),
               IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
         })
-        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> writeBehindJobUpdateStore.pruneHistory(
-            op.getPruneJobUpdateHistory().getPerJobRetainCount(),
-            op.getPruneJobUpdateHistory().getHistoryPruneThresholdMs())).build();
+        .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
+          LOG.info("Dropping prune operation.  Updates will be pruned later.");
+        })
+        .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
+          writeBehindJobUpdateStore.removeJobUpdates(
+              IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
+        .build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 57c483b..5859f80 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -218,7 +218,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
         @Override
         public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           snapshot.setJobUpdateDetails(
-              store.getJobUpdateStore().fetchAllJobUpdateDetails().stream()
+              store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
                   .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
                   .collect(Collectors.toSet()));
         }
@@ -227,7 +227,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
         public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
           if (snapshot.getJobUpdateDetailsSize() > 0) {
             JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
-            updateStore.deleteAllUpdatesAndEvents();
+            updateStore.deleteAllUpdates();
             for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) {
               JobUpdateDetails details = storedDetails.getDetails();
               updateStore.saveJobUpdate(thriftBackfill.backFillJobUpdate(details.getUpdate()));

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 4d051fc..41061f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
 import org.apache.aurora.gen.storage.RemoveJob;
 import org.apache.aurora.gen.storage.RemoveQuota;
 import org.apache.aurora.gen.storage.RemoveTasks;
@@ -52,10 +51,8 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.slf4j.Logger;
@@ -237,21 +234,12 @@ class WriteAheadStorage implements
   }
 
   @Override
-  public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
-    Set<IJobUpdateKey> prunedUpdates = jobUpdateStore.pruneHistory(
-        perJobRetainCount,
-        historyPruneThresholdMs);
+  public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+    requireNonNull(keys);
 
-    if (!prunedUpdates.isEmpty()) {
-      // Pruned updates will eventually go away from persisted storage when a new snapshot is cut.
-      // So, persisting pruning attempts is not strictly necessary as the periodic pruner will
-      // provide eventual consistency between volatile and persistent storage upon scheduler
-      // restart. By generating an out of band pruning during log replay the consistency is
-      // achieved sooner without potentially exposing pruned but not yet persisted data.
-      write(Op.pruneJobUpdateHistory(
-          new PruneJobUpdateHistory(perJobRetainCount, historyPruneThresholdMs)));
-    }
-    return prunedUpdates;
+    // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
+    // read it.  JobUpdates are only removed implicitly when a snapshot is taken.
+    jobUpdateStore.removeJobUpdates(keys);
   }
 
   @Override
@@ -279,7 +267,7 @@ class WriteAheadStorage implements
   }
 
   @Override
-  public void deleteAllUpdatesAndEvents() {
+  public void deleteAllUpdates() {
     throw new UnsupportedOperationException(
         "Unsupported since casual storage users should never be doing this.");
   }
@@ -370,37 +358,12 @@ class WriteAheadStorage implements
   }
 
   @Override
-  public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
-    return this.jobUpdateStore.fetchJobUpdateSummaries(query);
-  }
-
-  @Override
-  public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
-    return this.jobUpdateStore.fetchJobUpdateDetails(query);
+  public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+    return this.jobUpdateStore.fetchJobUpdates(query);
   }
 
   @Override
-  public Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) {
-    return this.jobUpdateStore.fetchJobUpdateDetails(key);
-  }
-
-  @Override
-  public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
+  public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
     return this.jobUpdateStore.fetchJobUpdate(key);
   }
-
-  @Override
-  public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) {
-    return this.jobUpdateStore.fetchJobUpdateInstructions(key);
-  }
-
-  @Override
-  public Set<IJobUpdateDetails> fetchAllJobUpdateDetails() {
-    return this.jobUpdateStore.fetchAllJobUpdateDetails();
-  }
-
-  @Override
-  public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) {
-    return this.jobUpdateStore.fetchInstanceEvents(key, instanceId);
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
index 826cee9..f96ec08 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
@@ -14,58 +14,39 @@
 
 package org.apache.aurora.scheduler.storage.mem;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import javax.inject.Inject;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 import com.google.common.primitives.Longs;
 
 import org.apache.aurora.common.base.MorePreconditions;
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobUpdateAction;
 import org.apache.aurora.gen.JobUpdateDetails;
 import org.apache.aurora.gen.JobUpdateEvent;
 import org.apache.aurora.gen.JobUpdateState;
-import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName;
-import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName;
-
 public class MemJobUpdateStore implements JobUpdateStore.Mutable {
 
   private static final Ordering<IJobUpdateDetails> REVERSE_LAST_MODIFIED_ORDER = Ordering.natural()
@@ -73,88 +54,19 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
       .onResultOf(u -> u.getUpdate().getSummary().getState().getLastModifiedTimestampMs());
 
   private final Map<IJobUpdateKey, IJobUpdateDetails> updates = Maps.newConcurrentMap();
-  private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
-  private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
-
-  @Inject
-  public MemJobUpdateStore(StatsProvider statsProvider) {
-    this.jobUpdateEventStats = CacheBuilder.newBuilder()
-        .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
-          @Override
-          public AtomicLong load(JobUpdateStatus status) {
-            return statsProvider.makeCounter(jobUpdateStatusStatName(status));
-          }
-        });
-    for (JobUpdateStatus status : JobUpdateStatus.values()) {
-      jobUpdateEventStats.getUnchecked(status).get();
-    }
-    this.jobUpdateActionStats = CacheBuilder.newBuilder()
-        .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
-          @Override
-          public AtomicLong load(JobUpdateAction action) {
-            return statsProvider.makeCounter(jobUpdateActionStatName(action));
-          }
-        });
-    for (JobUpdateAction action : JobUpdateAction.values()) {
-      jobUpdateActionStats.getUnchecked(action).get();
-    }
-  }
 
-  @Timed("job_update_store_fetch_summaries")
+  @Timed("job_update_store_fetch_details_query")
   @Override
-  public synchronized List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
-    return performQuery(query)
-        .map(u -> u.getUpdate().getSummary())
-        .collect(Collectors.toList());
-  }
-
-  @Timed("job_update_store_fetch_details_list")
-  @Override
-  public synchronized List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
+  public synchronized List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
     return performQuery(query).collect(Collectors.toList());
   }
 
   @Timed("job_update_store_fetch_details")
   @Override
-  public synchronized Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) {
+  public synchronized Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
     return Optional.fromNullable(updates.get(key));
   }
 
-  @Timed("job_update_store_fetch_update")
-  @Override
-  public synchronized Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
-    return Optional.fromNullable(updates.get(key)).transform(IJobUpdateDetails::getUpdate);
-  }
-
-  @Timed("job_update_store_fetch_instructions")
-  @Override
-  public synchronized Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(
-      IJobUpdateKey key) {
-
-    return Optional.fromNullable(updates.get(key))
-        .transform(u -> u.getUpdate().getInstructions());
-  }
-
-  @Timed("job_update_store_fetch_all_details")
-  @Override
-  public synchronized Set<IJobUpdateDetails> fetchAllJobUpdateDetails() {
-    return ImmutableSet.copyOf(updates.values());
-  }
-
-  @Timed("job_update_store_fetch_instance_events")
-  @Override
-  public synchronized List<IJobInstanceUpdateEvent> fetchInstanceEvents(
-      IJobUpdateKey key,
-      int instanceId) {
-
-    return java.util.Optional.ofNullable(updates.get(key))
-        .map(IJobUpdateDetails::getInstanceEvents)
-        .orElse(ImmutableList.of())
-        .stream()
-        .filter(e -> e.getInstanceId() == instanceId)
-        .collect(Collectors.toList());
-  }
-
   private static void validateInstructions(IJobUpdateInstructions instructions) {
     if (!instructions.isSetDesiredState() && instructions.getInitialState().isEmpty()) {
       throw new IllegalArgumentException(
@@ -182,10 +94,6 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
     requireNonNull(update);
     validateInstructions(update.getInstructions());
 
-    if (updates.containsKey(update.getSummary().getKey())) {
-      throw new StorageException("Update already exists: " + update.getSummary().getKey());
-    }
-
     JobUpdateDetails mutable = new JobUpdateDetails()
         .setUpdate(update.newBuilder())
         .setUpdateEvents(ImmutableList.of())
@@ -211,7 +119,6 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
     mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents()));
     mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
     updates.put(key, IJobUpdateDetails.build(mutable));
-    jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet();
   }
 
   private static final Ordering<JobInstanceUpdateEvent> INSTANCE_EVENT_ORDERING = Ordering.natural()
@@ -233,66 +140,23 @@ public class MemJobUpdateStore implements JobUpdateStore.Mutable {
     mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents()));
     mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
     updates.put(key, IJobUpdateDetails.build(mutable));
-    jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet();
   }
 
-  @Timed("job_update_store_delete_all")
+  @Timed("job_update_store_delete_updates")
   @Override
-  public synchronized void deleteAllUpdatesAndEvents() {
-    updates.clear();
+  public synchronized void removeJobUpdates(Set<IJobUpdateKey> key) {
+    requireNonNull(key);
+    updates.keySet().removeAll(key);
   }
 
-  @Timed("job_update_store_prune_history")
+  @Timed("job_update_store_delete_all")
   @Override
-  public synchronized Set<IJobUpdateKey> pruneHistory(
-      int perJobRetainCount,
-      long historyPruneThresholdMs) {
-
-    Supplier<Stream<IJobUpdateSummary>> completedUpdates = () -> updates.values().stream()
-        .map(u -> u.getUpdate().getSummary())
-        .filter(s -> TERMINAL_STATES.contains(s.getState().getStatus()));
-
-    Predicate<IJobUpdateSummary> expiredFilter =
-        s -> s.getState().getCreatedTimestampMs() < historyPruneThresholdMs;
-
-    ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();
-
-    // Gather updates based on time threshold.
-    pruneBuilder.addAll(completedUpdates.get()
-        .filter(expiredFilter)
-        .map(IJobUpdateSummary::getKey)
-        .collect(Collectors.toList()));
-
-    Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
-        // Avoid counting to-be-removed expired updates.
-        completedUpdates.get().filter(expiredFilter.negate()).iterator(),
-        s -> s.getKey().getJob());
-
-    for (Map.Entry<IJobKey, Collection<IJobUpdateSummary>> entry
-        : updatesByJob.asMap().entrySet()) {
-
-      if (entry.getValue().size() > perJobRetainCount) {
-        Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
-            .onResultOf(s -> s.getState().getCreatedTimestampMs());
-        pruneBuilder.addAll(creationOrder
-            .leastOf(entry.getValue(), entry.getValue().size() - perJobRetainCount)
-            .stream()
-            .map(IJobUpdateSummary::getKey)
-            .iterator());
-      }
-    }
-
-    Set<IJobUpdateKey> pruned = pruneBuilder.build();
-    updates.keySet().removeAll(pruned);
-
-    return pruned;
+  public synchronized void deleteAllUpdates() {
+    updates.clear();
   }
 
   private static JobUpdateState synthesizeUpdateState(JobUpdateDetails update) {
-    JobUpdateState state = update.getUpdate().getSummary().getState();
-    if (state == null) {
-      state = new JobUpdateState();
-    }
+    JobUpdateState state = new JobUpdateState();
 
     JobUpdateEvent firstEvent = Iterables.getFirst(update.getUpdateEvents(), null);
     if (firstEvent != null) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
index bba1161..9d327e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java
@@ -19,6 +19,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
@@ -308,11 +309,15 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
   @Override
   public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) {
     IJobUpdateQuery query = IJobUpdateQuery.build(requireNonNull(mutableQuery));
-    return ok(Result.getJobUpdateSummariesResult(
-        new GetJobUpdateSummariesResult()
-            .setUpdateSummaries(IJobUpdateSummary.toBuildersList(storage.read(
-                storeProvider ->
-                    storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(query))))));
+
+    List<IJobUpdateSummary> summaries = storage.read(
+        storeProvider -> storeProvider.getJobUpdateStore()
+            .fetchJobUpdates(query)
+            .stream()
+            .map(u -> u.getUpdate().getSummary()).collect(Collectors.toList()));
+
+    return ok(Result.getJobUpdateSummariesResult(new GetJobUpdateSummariesResult()
+        .setUpdateSummaries(IJobUpdateSummary.toBuildersList(summaries))));
   }
 
   @Override
@@ -324,8 +329,8 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
     if (mutableQuery != null) {
       IJobUpdateQuery query = IJobUpdateQuery.build(mutableQuery);
 
-      List<IJobUpdateDetails> details = storage.read(storeProvider ->
-          storeProvider.getJobUpdateStore().fetchJobUpdateDetails(query));
+      List<IJobUpdateDetails> details =
+          storage.read(storeProvider -> storeProvider.getJobUpdateStore().fetchJobUpdates(query));
 
       return ok(Result.getJobUpdateDetailsResult(new GetJobUpdateDetailsResult()
           .setDetailsList(IJobUpdateDetails.toBuildersList(details))));
@@ -334,7 +339,7 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface {
     // TODO(zmanji): Remove this code once `mutableKey` is removed in AURORA-1765
     IJobUpdateKey key = IJobUpdateKey.build(mutableKey);
     Optional<IJobUpdateDetails> details = storage.read(storeProvider ->
-        storeProvider.getJobUpdateStore().fetchJobUpdateDetails(key));
+        storeProvider.getJobUpdateStore().fetchJobUpdate(key));
 
     if (details.isPresent()) {
       return addMessage(ok(Result.getJobUpdateDetailsResult(

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index dc8d11c..87b18b4 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -13,17 +13,23 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -36,6 +42,7 @@ import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.collections.Pair;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.JobInstanceUpdateEvent;
 import org.apache.aurora.gen.JobUpdateAction;
@@ -80,6 +87,8 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
 import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
 import static org.apache.aurora.scheduler.base.Jobs.AWAITING_PULSE_STATES;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName;
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_ACTIVE_RESUME_STATE;
@@ -123,6 +132,9 @@ class JobUpdateControllerImpl implements JobUpdateController {
   private final Map<IJobKey, UpdateFactory.Update> updates =
       Collections.synchronizedMap(Maps.newHashMap());
 
+  private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
+  private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
+
   @Inject
   JobUpdateControllerImpl(
       UpdateFactory updateFactory,
@@ -132,7 +144,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
       UpdateAgentReserver updateAgentReserver,
       Clock clock,
       Lifecycle lifecycle,
-      TaskEventBatchWorker batchWorker) {
+      TaskEventBatchWorker batchWorker,
+      StatsProvider statsProvider) {
 
     this.updateFactory = requireNonNull(updateFactory);
     this.storage = requireNonNull(storage);
@@ -143,6 +156,26 @@ class JobUpdateControllerImpl implements JobUpdateController {
     this.batchWorker = requireNonNull(batchWorker);
     this.pulseHandler = new PulseHandler(clock);
     this.updateAgentReserver = requireNonNull(updateAgentReserver);
+
+    this.jobUpdateEventStats = CacheBuilder.newBuilder()
+        .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
+          @Override
+          public AtomicLong load(JobUpdateStatus status) {
+            return statsProvider.makeCounter(jobUpdateStatusStatName(status));
+          }
+        });
+    Arrays.stream(JobUpdateStatus.values())
+        .forEach(status -> jobUpdateEventStats.getUnchecked(status).get());
+
+    this.jobUpdateActionStats = CacheBuilder.newBuilder()
+        .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
+          @Override
+          public AtomicLong load(JobUpdateAction action) {
+            return statsProvider.makeCounter(jobUpdateActionStatName(action));
+          }
+        });
+    Arrays.stream(JobUpdateAction.values())
+        .forEach(action -> jobUpdateActionStats.getUnchecked(action).get());
   }
 
   @Override
@@ -164,8 +197,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
         throw new IllegalArgumentException("Update instruction is a no-op.");
       }
 
-      List<IJobUpdateSummary> activeJobUpdates =
-          storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryActiveByJob(job));
+      List<IJobUpdateDetails> activeJobUpdates =
+          storeProvider.getJobUpdateStore().fetchJobUpdates(queryActiveByJob(job));
       if (!activeJobUpdates.isEmpty()) {
         if (activeJobUpdates.size() > 1) {
           LOG.error("Multiple active updates exist for this job. {}", activeJobUpdates);
@@ -173,11 +206,11 @@ class JobUpdateControllerImpl implements JobUpdateController {
               String.format("Multiple active updates exist for this job. %s", activeJobUpdates));
         }
 
-        IJobUpdateSummary activeJobUpdate = activeJobUpdates.get(0);
+        IJobUpdateDetails activeUpdate = activeJobUpdates.stream().findFirst().get();
         throw new UpdateInProgressException("An active update already exists for this job, "
             + "please terminate it before starting another. "
             + "Active updates are those in states " + Updates.ACTIVE_JOB_UPDATE_STATES,
-            activeJobUpdate);
+            activeUpdate.getUpdate().getSummary());
       }
 
       LOG.info("Starting update for job " + job);
@@ -202,7 +235,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
     requireNonNull(job);
 
     if (storage.read(p -> !p.getJobUpdateStore()
-        .fetchJobUpdateSummaries(queryActiveByJob(job)).isEmpty())) {
+        .fetchJobUpdates(queryActiveByJob(job)).isEmpty())) {
 
       throw new JobUpdatingException("Job is currently updating");
     }
@@ -225,14 +258,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
     requireNonNull(auditData);
     LOG.info("Attempting to resume update " + key);
     storage.write((NoResult<UpdateStateException>) storeProvider -> {
-      IJobUpdateDetails details = Iterables.getOnlyElement(
-          storeProvider.getJobUpdateStore().fetchJobUpdateDetails(queryByUpdate(key)), null);
+      Optional<IJobUpdateDetails> details = storeProvider.getJobUpdateStore().fetchJobUpdate(key);
 
-      if (details == null) {
+      if (!details.isPresent()) {
         throw new UpdateStateException("Update does not exist: " + key);
       }
 
-      IJobUpdate update = details.getUpdate();
+      IJobUpdate update = details.get().getUpdate();
       IJobUpdateKey key1 = update.getSummary().getKey();
       Function<JobUpdateStatus, JobUpdateStatus> stateChange =
           isCoordinatedAndPulseExpired(key1, update.getInstructions())
@@ -294,7 +326,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   public void systemResume() {
     storage.write((NoResult.Quiet) storeProvider -> {
       for (IJobUpdateDetails details
-          : storeProvider.getJobUpdateStore().fetchJobUpdateDetails(ACTIVE_QUERY)) {
+          : storeProvider.getJobUpdateStore().fetchJobUpdates(ACTIVE_QUERY)) {
 
         IJobUpdateSummary summary = details.getUpdate().getSummary();
         IJobUpdateInstructions instructions = details.getUpdate().getInstructions();
@@ -396,7 +428,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   }
 
   private IJobUpdateSummary getOnlyMatch(JobUpdateStore store, IJobUpdateQuery query) {
-    return Iterables.getOnlyElement(store.fetchJobUpdateSummaries(query));
+    return Iterables.getOnlyElement(store.fetchJobUpdates(query)).getUpdate().getSummary();
   }
 
   @VisibleForTesting
@@ -423,13 +455,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
     storage.write((NoResult<UpdateStateException>) storeProvider -> {
 
-      IJobUpdateSummary update = Iterables.getOnlyElement(
-          storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryByUpdate(key)), null);
-      if (update == null) {
+      Optional<IJobUpdateDetails> update = storeProvider.getJobUpdateStore().fetchJobUpdate(key);
+      if (!update.isPresent()) {
         throw new UpdateStateException("Update does not exist " + key);
       }
 
-      changeUpdateStatus(storeProvider, update, stateChange.apply(update.getState().getStatus()));
+      IJobUpdateSummary summary = update.get().getUpdate().getSummary();
+      changeUpdateStatus(storeProvider, summary, stateChange.apply(summary.getState().getStatus()));
     });
   }
 
@@ -468,6 +500,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
       updateStore.saveJobUpdateEvent(
           key,
           IJobUpdateEvent.build(proposedEvent.setTimestampMs(clock.nowMillis()).setStatus(status)));
+      jobUpdateEventStats.getUnchecked(status).incrementAndGet();
     }
 
     if (JobUpdateStore.TERMINAL_STATES.contains(status)) {
@@ -487,7 +520,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
         checkState(!updates.containsKey(job), "Updater already exists for %s", job);
       }
 
-      IJobUpdate jobUpdate = updateStore.fetchJobUpdate(key).get();
+      IJobUpdate jobUpdate = updateStore.fetchJobUpdate(key).get().getUpdate();
       UpdateFactory.Update update;
       try {
         update = updateFactory.newUpdate(jobUpdate.getInstructions(), action == ROLL_FORWARD);
@@ -556,7 +589,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
     JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
 
-    IJobUpdateInstructions instructions = updateStore.fetchJobUpdateInstructions(key).get();
+    IJobUpdateInstructions instructions = updateStore.fetchJobUpdate(key).get()
+        .getUpdate().getInstructions();
     if (isCoordinatedAndPulseExpired(key, instructions)) {
       // Move coordinated update into awaiting pulse state.
       JobUpdateStatus blockedStatus = getBlockedState(summary.getState().getStatus());
@@ -578,7 +612,11 @@ class JobUpdateControllerImpl implements JobUpdateController {
       Iterable<InstanceUpdateStatus> statusChanges;
 
       int instanceId = entry.getKey();
-      List<IJobInstanceUpdateEvent> savedEvents = updateStore.fetchInstanceEvents(key, instanceId);
+      List<IJobInstanceUpdateEvent> savedEvents = updateStore.fetchJobUpdate(key).get()
+          .getInstanceEvents()
+          .stream()
+          .filter(e -> e.getInstanceId() == instanceId)
+          .collect(Collectors.toList());
 
       Set<JobUpdateAction> savedActions =
           FluentIterable.from(savedEvents).transform(EVENT_TO_ACTION).toSet();
@@ -609,6 +647,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
                   .setTimestampMs(clock.nowMillis())
                   .setAction(action));
           updateStore.saveJobInstanceUpdateEvent(summary.getKey(), event);
+          jobUpdateActionStats.getUnchecked(action).incrementAndGet();
         }
       }
     }
@@ -701,11 +740,6 @@ class JobUpdateControllerImpl implements JobUpdateController {
               JobUpdateAction.INSTANCE_ROLLBACK_FAILED)
           .build();
 
-  @VisibleForTesting
-  static IJobUpdateQuery queryByUpdate(IJobUpdateKey key) {
-    return IJobUpdateQuery.build(new JobUpdateQuery().setKey(key.newBuilder()));
-  }
-
   private static JobUpdateEvent newEvent(JobUpdateStatus status) {
     return new JobUpdateEvent().setStatus(status);
   }
@@ -722,7 +756,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
         String.format(FATAL_ERROR_FORMAT, "Key: " + key + " Instance key: " + instance),
         () -> storage.write((NoResult.Quiet) storeProvider -> {
           IJobUpdateSummary summary =
-              getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key));
+              storeProvider.getJobUpdateStore().fetchJobUpdate(key).get().getUpdate().getSummary();
           JobUpdateStatus status = summary.getState().getStatus();
           // Suppress this evaluation if the updater is not currently active.
           if (JobUpdateStateMachine.isActive(status)) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
index 74db5ec..a1bf04a 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
@@ -13,67 +13,157 @@
  */
 package org.apache.aurora.scheduler.pruning;
 
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdateState;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.HistoryPrunerSettings;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.JOB_UPDATES_PRUNED;
-import static org.easymock.EasyMock.expect;
+import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
+import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.FAILED;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.junit.Assert.assertEquals;
 
-public class JobUpdateHistoryPrunerTest extends EasyMockTest {
+public class JobUpdateHistoryPrunerTest {
+
+  private Storage storage;
+
+  @Before
+  public void setUp() {
+    storage = MemStorageModule.newEmptyStorage();
+  }
+
   @Test
-  public void testExecution() throws Exception {
-    StorageTestUtil storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
+  public void testPruneHistory() {
+    IJobKey job2 = JobKeys.from("testRole2", "testEnv2", "job2");
+
+    IJobUpdateDetails update1 = makeAndSave(makeKey("u1"), ROLLING_BACK, 123L, 123L);
+    IJobUpdateDetails update2 = makeAndSave(makeKey("u2"), ABORTED, 124L, 124L);
+    IJobUpdateDetails update3 = makeAndSave(makeKey("u3"), ROLLED_BACK, 125L, 125L);
+    IJobUpdateDetails update4 = makeAndSave(makeKey("u4"), FAILED, 126L, 126L);
+    IJobUpdateDetails update5 = makeAndSave(makeKey(job2, "u5"), ERROR, 123L, 123L);
+    IJobUpdateDetails update6 = makeAndSave(makeKey(job2, "u6"), FAILED, 125L, 125L);
+    IJobUpdateDetails update7 = makeAndSave(makeKey(job2, "u7"), ROLLING_FORWARD, 126L, 126L);
 
-    final FakeStatsProvider statsProvider = new FakeStatsProvider();
+    long pruningThreshold = 120;
 
-    final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
-    FakeScheduledExecutor executorClock =
-        FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2);
+    // No updates pruned.
+    pruneHistory(3, pruningThreshold);
+    assertRetainedUpdates(update1, update2, update3, update4, update5, update6, update7);
 
-    Clock mockClock = createMock(Clock.class);
-    expect(mockClock.nowMillis()).andReturn(2L).times(2);
+    // 1 update pruned.
+    pruneHistory(2, pruningThreshold);
+    assertRetainedUpdates(update1, update3, update4, update5, update6, update7);
 
-    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1))
-        .andReturn(ImmutableSet.of(
-            IJobUpdateKey.build(
-                new JobUpdateKey().setJob(new JobKey("role", "env", "job")).setId("id1"))));
-    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of());
+    // 2 update pruned.
+    pruneHistory(1, pruningThreshold);
+    assertRetainedUpdates(update1, update4, update6, update7);
 
-    control.replay();
+    // The oldest update is pruned.
+    pruneHistory(1, 126);
+    assertRetainedUpdates(update1, update4, update7);
 
-    executorClock.assertEmpty();
+    // Nothing survives the 0 per job count.
+    pruneHistory(0, pruningThreshold);
+    assertRetainedUpdates(update1, update7);
+  }
+
+  private void pruneHistory(int retainCount, long pruningThresholdMs) {
+    FakeClock clock = new FakeClock();
+    clock.setNowMillis(100 + pruningThresholdMs);
     JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner(
-        mockClock,
-        executor,
-        storageUtil.storage,
+        clock,
+        storage,
         new HistoryPrunerSettings(
-            Amount.of(1L, Time.MILLISECONDS),
-            Amount.of(1L, Time.MILLISECONDS),
-            1),
-        statsProvider);
-
-    pruner.startAsync().awaitRunning();
-
-    assertEquals(0L, statsProvider.getValue(JOB_UPDATES_PRUNED));
-    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
-    assertEquals(1L, statsProvider.getValue(JOB_UPDATES_PRUNED));
-    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
-    assertEquals(1L, statsProvider.getValue(JOB_UPDATES_PRUNED));
+            Amount.of(1L, Time.DAYS),
+            Amount.of(100L, Time.MILLISECONDS),
+            retainCount),
+        new FakeStatsProvider());
+    pruner.runForTest();
+  }
+
+  private void assertRetainedUpdates(IJobUpdateDetails... updates) {
+    storage.read(store -> {
+      assertEquals(
+          Stream.of(updates).map(u -> u.getUpdate().getSummary().getKey())
+              .collect(Collectors.toSet()),
+          store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
+              .map(u -> u.getUpdate().getSummary().getKey())
+              .collect(Collectors.toSet()));
+      return null;
+    });
+  }
+
+  private static IJobUpdateKey makeKey(String id) {
+    return makeKey(JOB, id);
+  }
+
+  private static IJobUpdateKey makeKey(IJobKey job, String id) {
+    return IJobUpdateKey.build(new JobUpdateKey().setJob(job.newBuilder()).setId(id));
+  }
+
+  private IJobUpdateDetails makeAndSave(
+      IJobUpdateKey key,
+      JobUpdateStatus status,
+      long createdMs,
+      long lastMs) {
+
+    IJobUpdateDetails update = IJobUpdateDetails.build(new JobUpdateDetails()
+        .setUpdateEvents(ImmutableList.of(
+            new JobUpdateEvent(status, lastMs)
+                .setUser("user")
+                .setMessage("message")
+        ))
+        .setInstanceEvents(ImmutableList.of())
+        .setUpdate(new JobUpdate()
+            .setInstructions(new JobUpdateInstructions()
+                .setDesiredState(new InstanceTaskConfig()
+                    .setTask(new TaskConfig())
+                    .setInstances(ImmutableSet.of(new Range()))))
+            .setSummary(new JobUpdateSummary()
+                .setKey(key.newBuilder())
+                .setState(new JobUpdateState()
+                    .setCreatedTimestampMs(createdMs)
+                    .setLastModifiedTimestampMs(lastMs)
+                    .setStatus(status)))));
+
+    storage.write((NoResult.Quiet) storeProvider -> {
+      JobUpdateStore.Mutable store = storeProvider.getJobUpdateStore();
+      store.saveJobUpdate(update.getUpdate());
+      update.getUpdateEvents().forEach(event -> store.saveJobUpdateEvent(key, event));
+      update.getInstanceEvents().forEach(event -> store.saveJobInstanceUpdateEvent(key, event));
+    });
+    return update;
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 5e5c518..2c33c13 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -35,7 +35,7 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
+import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunerSettings;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -92,7 +92,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
         executor,
         stateManager,
         clock,
-        new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
+        new HistoryPrunerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
         storageUtil.storage,
         new Lifecycle(shutdownCommand),
         batchWorker,

http://git-wip-us.apache.org/repos/asf/aurora/blob/284f40f5/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 6be4a9c..c1825f6 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.quota;
 
-import java.util.List;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -25,6 +23,7 @@ import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateDetails;
 import org.apache.aurora.gen.JobUpdateInstructions;
 import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.JobUpdateSummary;
@@ -44,8 +43,8 @@ import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -480,16 +479,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(prodTask("foo", 2, 2, 2), prodTask(JOB_NAME, 2, 2, 2)).times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
-    IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
-    JobUpdate builder = update.newBuilder();
-    builder.getInstructions().unsetDesiredState();
-
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
-        .andReturn(summaries).times(2);
-
-    expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
-        .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+    IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config, 1);
+    JobUpdateDetails builder = update.newBuilder();
+    builder.getUpdate().getInstructions().unsetDesiredState();
+    expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+        .andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
 
     expectNoCronJobs().times(2);
 
@@ -509,16 +503,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(prodTask("foo", 2, 2, 2), prodTask(JOB_NAME, 2, 2, 2)).times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
-    IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
-    JobUpdate builder = update.newBuilder();
-    builder.getInstructions().setInitialState(ImmutableSet.of());
-
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
-        .andReturn(summaries).times(2);
-
-    expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
-        .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+    IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config, 1);
+    JobUpdateDetails builder = update.newBuilder();
+    builder.getUpdate().getInstructions().setInitialState(ImmutableSet.of());
+    expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+        .andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
 
     expectNoCronJobs().times(2);
 
@@ -538,16 +527,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(prodTask("foo", 2, 2, 2), prodTask("bar", 2, 2, 2)).times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(UPDATE_KEY);
-    IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
-    JobUpdate builder = update.newBuilder();
-    builder.getInstructions().unsetDesiredState();
-
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(config.getJob().getRole())))
-        .andReturn(summaries).times(2);
-
-    expect(jobUpdateStore.fetchJobUpdate(UPDATE_KEY))
-        .andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+    IJobUpdateDetails update = buildJobUpdate(UPDATE_KEY, config, 1, config, 1);
+    JobUpdateDetails builder = update.newBuilder();
+    builder.getUpdate().getInstructions().unsetDesiredState();
+    expect(jobUpdateStore.fetchJobUpdates(updateQuery(config.getJob().getRole())))
+        .andReturn(ImmutableList.of(IJobUpdateDetails.build(builder))).times(2);
 
     expectNoCronJobs().times(2);
 
@@ -571,8 +555,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectNoJobUpdates().times(2);
 
     ITaskConfig config = taskConfig(1, 1, 1, true);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         taskConfig(2, 2, 2, true),
         1,
         config,
@@ -582,7 +566,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
@@ -596,8 +580,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectNoJobUpdates().times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         config,
         1,
         config,
@@ -607,7 +591,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         new QuotaInfo(bag(6, 6, 6), bag(4, 4, 4), EMPTY, bag(0, 0, 0), EMPTY),
@@ -624,8 +608,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectNoJobUpdates().times(2);
 
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         config,
         1,
         config,
@@ -635,7 +619,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         new QuotaInfo(bag(6, 6, 6), bag(6, 6, 6), EMPTY, bag(0, 0, 0), EMPTY),
@@ -645,8 +629,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
   @Test
   public void testCheckQuotaNewUpdateSkippedForNonProdDesiredState() {
     ITaskConfig config = taskConfig(2, 2, 2, false);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         taskConfig(2, 2, 2, true),
         1,
         config,
@@ -654,15 +638,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
   public void testCheckQuotaNewUpdateSkippedForDedicatedDesiredState() {
     ITaskConfig config = taskConfig(2, 2, 2, false);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         prodDedicatedTask("dedicatedJob", 1, 1, 1).getAssignedTask().getTask(),
         1,
         config,
@@ -670,20 +654,20 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update, storeProvider);
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update.getUpdate(), storeProvider);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
   @Test
   public void testCheckQuotaNewUpdateSkippedForEmptyDesiredState() {
     ITaskConfig config = taskConfig(2, 2, 2, true);
-    IJobUpdate update = buildJobUpdate(
-        buildJobUpdateSummaries(UPDATE_KEY).get(0),
+    IJobUpdateDetails update = buildJobUpdate(
+        UPDATE_KEY,
         config,
         1,
         config,
         1);
-    JobUpdate updateBuilder = update.newBuilder();
+    JobUpdate updateBuilder = update.getUpdate().newBuilder();
     updateBuilder.getInstructions().unsetDesiredState();
 
     control.replay();
@@ -844,50 +828,39 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
   private void expectJobUpdates(
       ITaskConfig initial,
-      int intialInstances,
+      int initialInstances,
       ITaskConfig desired,
       int desiredInstances,
       int times) {
 
     IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey(initial.getJob().newBuilder(), "u1"));
-    List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(key);
-    IJobUpdate update =
-        buildJobUpdate(summaries.get(0), initial, intialInstances, desired, desiredInstances);
-
-    expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(initial.getJob().getRole())))
-        .andReturn(summaries)
+    expect(jobUpdateStore.fetchJobUpdates(updateQuery(initial.getJob().getRole())))
+        .andReturn(ImmutableList.of(
+            buildJobUpdate(key, initial, initialInstances, desired, desiredInstances)))
         .times(times);
-
-    expect(jobUpdateStore.fetchJobUpdate(key)).andReturn(Optional.of(update)).times(times);
-
-  }
-
-  private List<IJobUpdateSummary> buildJobUpdateSummaries(IJobUpdateKey key) {
-    return ImmutableList.of(IJobUpdateSummary.build(
-        new JobUpdateSummary().setKey(key.newBuilder())));
   }
 
-  private IJobUpdate buildJobUpdate(
-      IJobUpdateSummary summary,
+  private IJobUpdateDetails buildJobUpdate(
+      IJobUpdateKey key,
       ITaskConfig initial,
       int intialInstances,
       ITaskConfig desired,
       int desiredInstances) {
 
-    return IJobUpdate.build(new JobUpdate()
-        .setSummary(summary.newBuilder())
-        .setInstructions(new JobUpdateInstructions()
-            .setDesiredState(new InstanceTaskConfig()
-                .setTask(desired.newBuilder())
-                .setInstances(ImmutableSet.of(new Range(0, desiredInstances - 1))))
-            .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
-                .setTask(initial.newBuilder())
-                .setInstances(ImmutableSet.of(new Range(0, intialInstances - 1)))))));
+    return IJobUpdateDetails.build(new JobUpdateDetails()
+        .setUpdate(new JobUpdate()
+            .setSummary(new JobUpdateSummary().setKey(key.newBuilder()))
+            .setInstructions(new JobUpdateInstructions()
+                .setDesiredState(new InstanceTaskConfig()
+                    .setTask(desired.newBuilder())
+                    .setInstances(ImmutableSet.of(new Range(0, desiredInstances - 1))))
+                .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
+                    .setTask(initial.newBuilder())
+                    .setInstances(ImmutableSet.of(new Range(0, intialInstances - 1))))))));
   }
 
   private IExpectationSetters<?> expectNoJobUpdates() {
-    return expect(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(ROLE)))
-        .andReturn(ImmutableList.of());
+    return expect(jobUpdateStore.fetchJobUpdates(updateQuery(ROLE))).andReturn(ImmutableList.of());
   }
 
   private IExpectationSetters<?> expectNoTasks() {