You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/10/03 20:17:20 UTC

[2/2] git commit: Implementing update history pruner.

Implementing update history pruner.

Bugs closed: AURORA-743

Reviewed at https://reviews.apache.org/r/26232/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/9834b316
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/9834b316
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/9834b316

Branch: refs/heads/master
Commit: 9834b316a6f6d6cbcda6ffef733d8fe0d906464b
Parents: de47aba
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Oct 3 11:02:17 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Oct 3 11:02:17 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     |  42 +-
 .../aurora/scheduler/async/HistoryPruner.java   | 169 --------
 .../scheduler/async/JobUpdateHistoryPruner.java | 100 +++++
 .../scheduler/async/TaskHistoryPruner.java      | 169 ++++++++
 .../scheduler/storage/JobUpdateStore.java       |  14 +
 .../scheduler/storage/db/DBJobUpdateStore.java  |  21 +
 .../storage/db/JobUpdateDetailsMapper.java      |  40 +-
 .../storage/log/WriteAheadStorage.java          |  19 +
 .../storage/db/JobUpdateDetailsMapper.xml       |  83 +++-
 .../thrift/org/apache/aurora/gen/storage.thrift |   6 +
 .../scheduler/async/HistoryPrunerTest.java      | 397 -------------------
 .../async/JobUpdateHistoryPrunerTest.java       |  63 +++
 .../scheduler/async/TaskHistoryPrunerTest.java  | 397 +++++++++++++++++++
 .../storage/db/DBJobUpdateStoreTest.java        | 158 +++++++-
 .../scheduler/storage/log/LogStorageTest.java   |  27 ++
 .../testing/FakeScheduledExecutor.java          | 116 ++++++
 .../updater/FakeScheduledExecutor.java          |  87 ----
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   3 +-
 18 files changed, 1235 insertions(+), 676 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index aa45d27..c8f7f99 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -31,6 +31,8 @@ import com.google.inject.Binder;
 import com.google.inject.Key;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
+
+import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.args.constraints.NotNegative;
@@ -45,11 +47,11 @@ import com.twitter.common.util.TruncatedBinaryBackoff;
 
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.RandomGcExecutorSettings;
-import org.apache.aurora.scheduler.async.HistoryPruner.HistoryPrunnerSettings;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
 import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
+import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
@@ -150,6 +152,20 @@ public class AsyncModule extends AbstractModule {
       help = "Enable the preemptor and preemption")
   private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
 
+  @CmdLine(name = "job_update_history_per_job_threshold",
+      help = "Maximum number of completed job updates to retain in a job update history.")
+  private static final Arg<Integer> JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD = Arg.create(10);
+
+  @CmdLine(name = "job_update_history_pruning_interval",
+      help = "Job update history pruning interval.")
+  private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_INTERVAL =
+      Arg.create(Amount.of(15L, Time.MINUTES));
+
+  @CmdLine(name = "job_update_history_pruning_threshold",
+      help = "Time after which the scheduler will prune completed job update history.")
+  private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_THRESHOLD =
+      Arg.create(Amount.of(30L, Time.DAYS));
+
   private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
     @Override
     public Optional<String> findPreemptionSlotFor(
@@ -271,11 +287,11 @@ public class AsyncModule extends AbstractModule {
         ));
         bind(ScheduledExecutorService.class).toInstance(executor);
 
-        bind(HistoryPruner.class).in(Singleton.class);
-        expose(HistoryPruner.class);
+        bind(TaskHistoryPruner.class).in(Singleton.class);
+        expose(TaskHistoryPruner.class);
       }
     });
-    PubsubEventModule.bindSubscriber(binder(), HistoryPruner.class);
+    PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class);
 
     install(new PrivateModule() {
       @Override
@@ -299,6 +315,24 @@ public class AsyncModule extends AbstractModule {
         expose(GcExecutorLauncher.class);
       }
     });
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(JobUpdateHistoryPruner.HistoryPrunerSettings.class).toInstance(
+            new JobUpdateHistoryPruner.HistoryPrunerSettings(
+                JOB_UPDATE_HISTORY_PRUNING_INTERVAL.get(),
+                JOB_UPDATE_HISTORY_PRUNING_THRESHOLD.get(),
+                JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD.get()));
+
+        bind(ScheduledExecutorService.class).toInstance(
+            AsyncUtil.singleThreadLoggingScheduledExecutor("JobUpdatePruner-%d", LOG));
+
+        bind(JobUpdateHistoryPruner.class).in(Singleton.class);
+        expose(JobUpdateHistoryPruner.class);
+      }
+    });
+    LifecycleModule.bindStartupAction(binder(), JobUpdateHistoryPruner.class);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
deleted file mode 100644
index ebae58a..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.apiConstants;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-
-/**
- * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
- * transitioning into one of the inactive states.
- */
-public class HistoryPruner implements EventSubscriber {
-  private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
-
-  private final ScheduledExecutorService executor;
-  private final StateManager stateManager;
-  private final Clock clock;
-  private final HistoryPrunnerSettings settings;
-  private final Storage storage;
-
-  private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
-    @Override
-    public boolean apply(IScheduledTask task) {
-      return Tasks.getLatestEvent(task).getTimestamp()
-          <= clock.nowMillis() - settings.minRetentionThresholdMillis;
-    }
-  };
-
-  static class HistoryPrunnerSettings {
-    private final long pruneThresholdMillis;
-    private final long minRetentionThresholdMillis;
-    private final int perJobHistoryGoal;
-
-    HistoryPrunnerSettings(
-        Amount<Long, Time> inactivePruneThreshold,
-        Amount<Long, Time> minRetentionThreshold,
-        int perJobHistoryGoal) {
-
-      this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
-      this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS);
-      this.perJobHistoryGoal = perJobHistoryGoal;
-    }
-  }
-
-  @Inject
-  HistoryPruner(
-      final ScheduledExecutorService executor,
-      final StateManager stateManager,
-      final Clock clock,
-      final HistoryPrunnerSettings settings,
-      final Storage storage) {
-
-    this.executor = requireNonNull(executor);
-    this.stateManager = requireNonNull(stateManager);
-    this.clock = requireNonNull(clock);
-    this.settings = requireNonNull(settings);
-    this.storage = requireNonNull(storage);
-  }
-
-  @VisibleForTesting
-  long calculateTimeout(long taskEventTimestampMillis) {
-    return Math.max(
-        settings.minRetentionThresholdMillis,
-        settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis));
-  }
-
-  /**
-   * When triggered, records an inactive task state change.
-   *
-   * @param change Event when a task changes state.
-   */
-  @Subscribe
-  public void recordStateChange(TaskStateChange change) {
-    if (Tasks.isTerminated(change.getNewState())) {
-      long timeoutBasis = change.isTransition()
-          ? clock.nowMillis()
-          : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
-      registerInactiveTask(
-          Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
-          change.getTaskId(),
-          calculateTimeout(timeoutBasis));
-    }
-  }
-
-  private void deleteTasks(Set<String> taskIds) {
-    LOG.info("Pruning inactive tasks " + taskIds);
-    stateManager.deleteTasks(taskIds);
-  }
-
-  @VisibleForTesting
-  static Query.Builder jobHistoryQuery(IJobKey jobKey) {
-    return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
-  }
-
-  private void registerInactiveTask(
-      final IJobKey jobKey,
-      final String taskId,
-      long timeRemaining) {
-
-    LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
-    executor.schedule(
-        new Runnable() {
-          @Override
-          public void run() {
-            LOG.info("Pruning expired inactive task " + taskId);
-            deleteTasks(ImmutableSet.of(taskId));
-          }
-        },
-        timeRemaining,
-        TimeUnit.MILLISECONDS);
-
-    executor.submit(new Runnable() {
-      @Override
-      public void run() {
-        Collection<IScheduledTask> inactiveTasks =
-            Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
-        int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal;
-        if (tasksToPrune > 0 && inactiveTasks.size() > settings.perJobHistoryGoal) {
-          Set<String> toPrune = FluentIterable
-              .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
-              .filter(safeToDelete)
-              .limit(tasksToPrune)
-              .transform(Tasks.SCHEDULED_TO_ID)
-              .toSet();
-          deleteTasks(toPrune);
-        }
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
new file mode 100644
index 0000000..0b023a2
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Joiner;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Prunes per-job update history on a periodic basis.
+ */
+class JobUpdateHistoryPruner implements Command {
+  private static final Logger LOG = Logger.getLogger(JobUpdateHistoryPruner.class.getName());
+
+  private final Clock clock;
+  private final ScheduledExecutorService executor;
+  private final Storage storage;
+  private final HistoryPrunerSettings settings;
+
+  static class HistoryPrunerSettings {
+    private final Amount<Long, Time> pruneInterval;
+    private final Amount<Long, Time> maxHistorySize;
+    private final int maxUpdatesPerJob;
+
+    HistoryPrunerSettings(
+        Amount<Long, Time> pruneInterval,
+        Amount<Long, Time> maxHistorySize,
+        int maxUpdatesPerJob) {
+
+      this.pruneInterval = requireNonNull(pruneInterval);
+      this.maxHistorySize = requireNonNull(maxHistorySize);
+      this.maxUpdatesPerJob = maxUpdatesPerJob;
+    }
+  }
+
+  @Inject
+  JobUpdateHistoryPruner(
+      Clock clock,
+      ScheduledExecutorService executor,
+      Storage storage,
+      HistoryPrunerSettings settings) {
+
+    this.clock = requireNonNull(clock);
+    this.executor = requireNonNull(executor);
+    this.storage = requireNonNull(storage);
+    this.settings = requireNonNull(settings);
+  }
+
+  @Override
+  public void execute() throws RuntimeException {
+    executor.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            storage.write(new MutateWork.NoResult.Quiet() {
+              @Override
+              public void execute(MutableStoreProvider storeProvider) {
+                Set<String> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
+                    settings.maxUpdatesPerJob,
+                    clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
+
+                LOG.info(prunedUpdates.isEmpty()
+                    ? "No job update history to prune."
+                    : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates));
+              }
+            });
+          }
+        },
+        settings.pruneInterval.as(Time.MILLISECONDS),
+        settings.pruneInterval.as(Time.MILLISECONDS),
+        TimeUnit.MILLISECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
new file mode 100644
index 0000000..345cd89
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+
+/**
+ * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
+ * transitioning into one of the inactive states.
+ */
+public class TaskHistoryPruner implements EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName());
+
+  private final ScheduledExecutorService executor;
+  private final StateManager stateManager;
+  private final Clock clock;
+  private final HistoryPrunnerSettings settings;
+  private final Storage storage;
+
+  private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
+    @Override
+    public boolean apply(IScheduledTask task) {
+      return Tasks.getLatestEvent(task).getTimestamp()
+          <= clock.nowMillis() - settings.minRetentionThresholdMillis;
+    }
+  };
+
+  static class HistoryPrunnerSettings {
+    private final long pruneThresholdMillis;
+    private final long minRetentionThresholdMillis;
+    private final int perJobHistoryGoal;
+
+    HistoryPrunnerSettings(
+        Amount<Long, Time> inactivePruneThreshold,
+        Amount<Long, Time> minRetentionThreshold,
+        int perJobHistoryGoal) {
+
+      this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
+      this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS);
+      this.perJobHistoryGoal = perJobHistoryGoal;
+    }
+  }
+
+  @Inject
+  TaskHistoryPruner(
+      final ScheduledExecutorService executor,
+      final StateManager stateManager,
+      final Clock clock,
+      final HistoryPrunnerSettings settings,
+      final Storage storage) {
+
+    this.executor = requireNonNull(executor);
+    this.stateManager = requireNonNull(stateManager);
+    this.clock = requireNonNull(clock);
+    this.settings = requireNonNull(settings);
+    this.storage = requireNonNull(storage);
+  }
+
+  @VisibleForTesting
+  long calculateTimeout(long taskEventTimestampMillis) {
+    return Math.max(
+        settings.minRetentionThresholdMillis,
+        settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis));
+  }
+
+  /**
+   * When triggered, records an inactive task state change.
+   *
+   * @param change Event when a task changes state.
+   */
+  @Subscribe
+  public void recordStateChange(TaskStateChange change) {
+    if (Tasks.isTerminated(change.getNewState())) {
+      long timeoutBasis = change.isTransition()
+          ? clock.nowMillis()
+          : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
+      registerInactiveTask(
+          Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
+          change.getTaskId(),
+          calculateTimeout(timeoutBasis));
+    }
+  }
+
+  private void deleteTasks(Set<String> taskIds) {
+    LOG.info("Pruning inactive tasks " + taskIds);
+    stateManager.deleteTasks(taskIds);
+  }
+
+  @VisibleForTesting
+  static Query.Builder jobHistoryQuery(IJobKey jobKey) {
+    return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
+  }
+
+  private void registerInactiveTask(
+      final IJobKey jobKey,
+      final String taskId,
+      long timeRemaining) {
+
+    LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
+    executor.schedule(
+        new Runnable() {
+          @Override
+          public void run() {
+            LOG.info("Pruning expired inactive task " + taskId);
+            deleteTasks(ImmutableSet.of(taskId));
+          }
+        },
+        timeRemaining,
+        TimeUnit.MILLISECONDS);
+
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        Collection<IScheduledTask> inactiveTasks =
+            Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
+        int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal;
+        if (tasksToPrune > 0 && inactiveTasks.size() > settings.perJobHistoryGoal) {
+          Set<String> toPrune = FluentIterable
+              .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+              .filter(safeToDelete)
+              .limit(tasksToPrune)
+              .transform(Tasks.SCHEDULED_TO_ID)
+              .toSet();
+          deleteTasks(toPrune);
+        }
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index c3abffe..b7d8d52 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -133,5 +133,19 @@ public interface JobUpdateStore {
      * Deletes all updates and update events from the store.
      */
     void deleteAllUpdatesAndEvents();
+
+    /**
+     * Prunes (deletes) old completed updates and events from the store.
+     * <p>
+     * At least {@code perJobRetainCount} last completed updates that completed less than
+     * {@code historyPruneThreshold} ago will be kept for every job.
+     *
+     * @param perJobRetainCount Number of completed updates to retain per job.
+     * @param historyPruneThresholdMs Earliest timestamp in the past to retain history.
+     *                                Any completed updates created before this timestamp
+     *                                will be pruned.
+     * @return Set of pruned update IDs.
+     */
+    Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
index 3db0114..d479d20 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
@@ -135,6 +135,27 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
     detailsMapper.truncate();
   }
 
+  @Timed("job_update_store_prune_history")
+  @Override
+  public Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
+    ImmutableSet.Builder<String> pruned = ImmutableSet.builder();
+
+    Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning(
+        perJobRetainCount,
+        historyPruneThresholdMs);
+
+    for (Long jobKeyId : jobKeyIdsToPrune) {
+      Set<String> pruneVictims = detailsMapper.selectPruneVictims(
+          jobKeyId,
+          perJobRetainCount,
+          historyPruneThresholdMs);
+
+      detailsMapper.deleteCompletedUpdates(pruneVictims);
+      pruned.addAll(pruneVictims);
+    }
+    return pruned.build();
+  }
+
   @Timed("job_update_store_fetch_summaries")
   @Override
   public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
index c583e08..60f5359 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
@@ -104,6 +104,40 @@ interface JobUpdateDetailsMapper {
   void truncate();
 
   /**
+   * Deletes all updates and events with update ID in {@code updateIds}.
+   *
+   * @param updateIds Update IDs to delete.
+   */
+  void deleteCompletedUpdates(@Param("updateIds") Set<String> updateIds);
+
+  /**
+   * Selects all distinct job key IDs associated with at least {@code perJobRetainCount} completed
+   * updates or updates completed before {@code historyPruneThresholdMs}.
+   *
+   * @param perJobRetainCount Number of updates to keep per job.
+   * @param historyPruneThresholdMs History pruning timestamp threshold.
+   * @return Job key IDs.
+   */
+  Set<Long> selectJobKeysForPruning(
+      @Param("retainCount") int perJobRetainCount,
+      @Param("pruneThresholdMs") long historyPruneThresholdMs);
+
+  /**
+   * Groups all updates without a job lock in reverse chronological order of their created times
+   * and deletes anything in excess of {@code perJobRetainCount} or older than
+   * {@code historyPruneThresholdMs}.
+   *
+   * @param jobKeyId Job key ID to select pruning victims for.
+   * @param perJobRetainCount Number of updates to keep per job.
+   * @param historyPruneThresholdMs History pruning timestamp threshold.
+   * @return Update IDs to prune.
+   */
+  Set<String> selectPruneVictims(
+      @Param("keyId") long jobKeyId,
+      @Param("retainCount") int perJobRetainCount,
+      @Param("pruneThresholdMs") long historyPruneThresholdMs);
+
+  /**
    * Gets all job update summaries matching the provided {@code query}.
    * All {@code query} fields are ANDed together.
    *
@@ -116,7 +150,7 @@ interface JobUpdateDetailsMapper {
    * Gets details for the provided {@code updateId}.
    *
    * @param updateId Update ID to get.
-   * @return job update details for the provided update ID, if it exists.
+   * @return Job update details for the provided update ID, if it exists.
    */
   @Nullable
   StoredJobUpdateDetails selectDetails(String updateId);
@@ -125,7 +159,7 @@ interface JobUpdateDetailsMapper {
    * Gets job update for the provided {@code updateId}.
    *
    * @param updateId Update ID to select by.
-   * @return job update for the provided update ID, if it exists.
+   * @return Job update for the provided update ID, if it exists.
    */
   @Nullable
   JobUpdate selectUpdate(String updateId);
@@ -134,7 +168,7 @@ interface JobUpdateDetailsMapper {
    * Gets job update instructions for the provided {@code updateId}.
    *
    * @param updateId Update ID to select by.
-   * @return job update instructions for the provided update ID, if it exists.
+   * @return Job update instructions for the provided update ID, if it exists.
    */
   @Nullable
   JobUpdateInstructions selectInstructions(String updateId);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 66c9164..094d1c6 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -28,6 +28,7 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
 
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
 import org.apache.aurora.gen.storage.RemoveJob;
 import org.apache.aurora.gen.storage.RemoveLock;
 import org.apache.aurora.gen.storage.RemoveQuota;
@@ -308,6 +309,24 @@ class WriteAheadStorage extends ForwardingStore implements
   }
 
   @Override
+  public Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
+    Set<String> prunedUpdates = jobUpdateStore.pruneHistory(
+        perJobRetainCount,
+        historyPruneThresholdMs);
+
+    if (!prunedUpdates.isEmpty()) {
+      // Pruned updates will eventually go away from persisted storage when a new snapshot is cut.
+      // So, persisting pruning attempts is not strictly necessary as the periodic pruner will
+      // provide eventual consistency between volatile and persistent storage upon scheduler
+      // restart. By generating an out of band pruning during log replay the consistency is
+      // achieved sooner without potentially exposing pruned but not yet persisted data.
+      write(Op.pruneJobUpdateHistory(
+          new PruneJobUpdateHistory(perJobRetainCount, historyPruneThresholdMs)));
+    }
+    return prunedUpdates;
+  }
+
+  @Override
   public void deleteAllTasks() {
     throw new UnsupportedOperationException(
         "Unsupported since casual storage users should never be doing this.");

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
index 2a81a94..77032d8 100644
--- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
+++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
@@ -199,7 +199,7 @@
     </collection>
   </resultMap>
 
-  <sql id="timestamps_inner_joins">
+  <sql id="status_inner_join">
     INNER JOIN
     (
       SELECT
@@ -215,6 +215,9 @@
         GROUP BY update_id
       ) AS e_t ON e_t.update_id = e_s.update_id AND e_t.timestamp_ms = e_s.timestamp_ms
     ) AS max_status ON max_status.update_id = u.id
+  </sql>
+
+  <sql id="created_timestamp_inner_join">
     INNER JOIN
     (
       SELECT
@@ -223,6 +226,9 @@
       FROM job_update_events
       GROUP BY update_id
     ) AS min_ts ON min_ts.update_id = u.id
+  </sql>
+
+  <sql id="last_updated_timestamp_inner_join">
     INNER JOIN
     (
       SELECT
@@ -244,6 +250,12 @@
     ) AS max_ts ON max_ts.update_id = u.id
   </sql>
 
+  <sql id="timestamps_inner_joins">
+    <include refid="status_inner_join" />
+    <include refid="created_timestamp_inner_join" />
+    <include refid="last_updated_timestamp_inner_join" />
+  </sql>
+
   <select id="selectSummaries" resultMap="jobUpdateSummaryMap">
     SELECT
       u.update_id AS update_id,
@@ -320,8 +332,7 @@
       io.last AS jui_juse_r_last
   </sql>
 
-  <sql id="job_update_inner_joins">
-    FROM job_updates AS u
+  <sql id="job_key_inner_join">
     INNER JOIN job_keys AS j ON j.id = u.job_key_id
   </sql>
 
@@ -333,6 +344,10 @@
     LEFT OUTER JOIN job_updates_to_instance_overrides AS io ON io.update_id = u.id
   </sql>
 
+  <sql id="lock_outer_join">
+    LEFT OUTER JOIN job_update_locks AS l on l.update_id = u.id
+  </sql>
+
   <sql id="unscoped_details_select">
     SELECT
       <include refid="job_update_columns" />,
@@ -345,12 +360,13 @@
       i.instance_id AS i_instance_id,
       i.timestamp_ms AS i_timestamp_ms,
       l.lock_token AS lock_token
-    <include refid="job_update_inner_joins" />
+    FROM job_updates AS u
+    <include refid="job_key_inner_join" />
     <include refid="timestamps_inner_joins" />
     <include refid="job_update_outer_joins" />
     LEFT OUTER JOIN job_update_events AS e ON e.update_id = u.id
     LEFT OUTER JOIN job_instance_update_events AS i ON i.update_id = u.id
-    LEFT OUTER JOIN job_update_locks AS l on l.update_id = u.id
+    <include refid="lock_outer_join" />
   </sql>
 
   <!--Ideally, update instruction columns could be derived from job_update_columns above but that
@@ -380,7 +396,8 @@
       io.id AS juse_r_id,
       io.first AS juse_r_first,
       io.last AS juse_r_last
-    <include refid="job_update_inner_joins" />
+    FROM job_updates AS u
+    <include refid="job_key_inner_join" />
     <include refid="job_update_outer_joins" />
     WHERE u.update_id = #{id}
   </select>
@@ -388,7 +405,8 @@
   <select id="selectUpdate" resultMap="jobUpdateMap">
     SELECT
       <include refid="job_update_columns" />
-    <include refid="job_update_inner_joins" />
+    FROM job_updates AS u
+    <include refid="job_key_inner_join" />
     <include refid="timestamps_inner_joins" />
     <include refid="job_update_outer_joins" />
     WHERE u.update_id = #{id}
@@ -427,4 +445,55 @@
   <delete id="truncate">
     DELETE FROM job_updates;
   </delete>
+
+  <select id="selectJobKeysForPruning" resultType="long">
+    SELECT DISTINCT
+      u.job_key_id
+    FROM job_updates as u
+    <include refid="created_timestamp_inner_join" />
+    <include refid="lock_outer_join" />
+    WHERE l.id IS NULL
+    GROUP BY u.job_key_id
+    HAVING COUNT(u.job_key_id) > #{retainCount}
+    UNION
+    SELECT DISTINCT
+      u.job_key_id
+    FROM job_updates as u
+    <include refid="created_timestamp_inner_join" />
+    <include refid="lock_outer_join" />
+    WHERE min_ts.timestamp_ms &lt; #{pruneThresholdMs} AND l.id IS NULL
+  </select>
+
+  <select id="selectPruneVictims" resultType="String">
+    SELECT id FROM
+    (
+      SELECT
+        u.update_id as id
+      FROM job_updates as u
+      <include refid="created_timestamp_inner_join" />
+      <include refid="lock_outer_join" />
+      WHERE u.job_key_id = #{keyId}
+        AND l.id IS NULL
+      ORDER BY min_ts.timestamp_ms DESC
+      LIMIT NULL
+      OFFSET #{retainCount}
+    )
+    UNION
+    SELECT
+      u.update_id as id
+    FROM job_updates as u
+    <include refid="created_timestamp_inner_join" />
+    <include refid="lock_outer_join" />
+    WHERE u.job_key_id = #{keyId}
+      AND min_ts.timestamp_ms &lt;= #{pruneThresholdMs}
+      AND l.id IS NULL
+  </select>
+
+  <delete id="deleteCompletedUpdates">
+    DELETE FROM job_updates
+    WHERE update_id IN
+    <foreach item="element" collection="updateIds" open="(" separator="," close=")">
+      #{element}
+    </foreach>
+  </delete>
 </mapper>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/storage.thrift b/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 7e50245..5350ec9 100644
--- a/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -89,6 +89,11 @@ struct SaveJobInstanceUpdateEvent {
   2: string updateId
 }
 
+struct PruneJobUpdateHistory {
+  1: i32 perJobRetainCount
+  2: i64 historyPruneThresholdMs
+}
+
 union Op {
   1: SaveFrameworkId saveFrameworkId
   2: SaveAcceptedJob saveAcceptedJob
@@ -104,6 +109,7 @@ union Op {
   14: SaveJobUpdate saveJobUpdate
   15: SaveJobUpdateEvent saveJobUpdateEvent
   16: SaveJobInstanceUpdateEvent saveJobInstanceUpdateEvent
+  17: PruneJobUpdateHistory pruneJobUpdateHistory
 }
 
 // The current schema version ID.  This should be incremented each time the

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
deleted file mode 100644
index 011d9ec..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.async.HistoryPruner.HistoryPrunnerSettings;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.SANDBOX_DELETED;
-import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.fail;
-
-public class HistoryPrunerTest extends EasyMockTest {
-  private static final String JOB_A = "job-a";
-  private static final String TASK_ID = "task_id";
-  private static final String SLAVE_HOST = "HOST_A";
-  private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
-  private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
-  private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
-  private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
-  private static final int PER_JOB_HISTORY = 2;
-
-  private ScheduledFuture<?> future;
-  private ScheduledExecutorService executor;
-  private FakeClock clock;
-  private StateManager stateManager;
-  private StorageTestUtil storageUtil;
-  private HistoryPruner pruner;
-
-  @Before
-  public void setUp() {
-    future = createMock(new Clazz<ScheduledFuture<?>>() { });
-    executor = createMock(ScheduledExecutorService.class);
-    clock = new FakeClock();
-    stateManager = createMock(StateManager.class);
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    pruner = new HistoryPruner(
-        executor,
-        stateManager,
-        clock,
-        new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
-        storageUtil.storage);
-  }
-
-  @Test
-  public void testNoPruning() {
-    long taskATimestamp = clock.nowMillis();
-    IScheduledTask a = makeTask("a", FINISHED);
-
-    clock.advance(ONE_MS);
-    long taskBTimestamp = clock.nowMillis();
-    IScheduledTask b = makeTask("b", SANDBOX_DELETED);
-
-    expectNoImmediatePrune(ImmutableSet.of(a));
-    expectOneDelayedPrune(taskATimestamp);
-    expectNoImmediatePrune(ImmutableSet.of(a, b));
-    expectOneDelayedPrune(taskBTimestamp);
-
-    control.replay();
-
-    pruner.recordStateChange(TaskStateChange.initialized(a));
-    pruner.recordStateChange(TaskStateChange.initialized(b));
-  }
-
-  @Test
-  public void testStorageStartedWithPruning() {
-    long taskATimestamp = clock.nowMillis();
-    IScheduledTask a = makeTask("a", SANDBOX_DELETED);
-
-    clock.advance(ONE_MINUTE);
-    long taskBTimestamp = clock.nowMillis();
-    IScheduledTask b = makeTask("b", LOST);
-
-    clock.advance(ONE_MINUTE);
-    long taskCTimestamp = clock.nowMillis();
-    IScheduledTask c = makeTask("c", FINISHED);
-
-    clock.advance(ONE_MINUTE);
-    IScheduledTask d = makeTask("d", FINISHED);
-    IScheduledTask e = makeTask("job-x", "e", FINISHED);
-
-    expectNoImmediatePrune(ImmutableSet.of(a));
-    expectOneDelayedPrune(taskATimestamp);
-    expectNoImmediatePrune(ImmutableSet.of(a, b));
-    expectOneDelayedPrune(taskBTimestamp);
-    expectImmediatePrune(ImmutableSet.of(a, b, c), a);
-    expectOneDelayedPrune(taskCTimestamp);
-    expectImmediatePrune(ImmutableSet.of(b, c, d), b);
-    expectDefaultDelayedPrune();
-    expectNoImmediatePrune(ImmutableSet.of(e));
-    expectDefaultDelayedPrune();
-
-    control.replay();
-
-    for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
-      pruner.recordStateChange(TaskStateChange.initialized(task));
-    }
-  }
-
-  @Test
-  public void testStateChange() {
-    IScheduledTask starting = makeTask("a", STARTING);
-    IScheduledTask running = copy(starting, RUNNING);
-    IScheduledTask killed = copy(starting, KILLED);
-
-    expectNoImmediatePrune(ImmutableSet.of(killed));
-    expectDefaultDelayedPrune();
-
-    control.replay();
-
-    // No future set for non-terminal state transition.
-    changeState(starting, running);
-
-    // Future set for terminal state transition.
-    changeState(running, killed);
-  }
-
-  @Test
-  public void testActivateFutureAndExceedHistoryGoal() {
-    IScheduledTask running = makeTask("a", RUNNING);
-    IScheduledTask killed = copy(running, KILLED);
-    expectNoImmediatePrune(ImmutableSet.of(running));
-    Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
-
-    // Expect task "a" to be pruned when future is activated.
-    expectDeleteTasks("a");
-
-    control.replay();
-
-    // Capture future for inactive task "a"
-    changeState(running, killed);
-    clock.advance(ONE_HOUR);
-    // Execute future to prune task "a" from the system.
-    delayedDelete.getValue().run();
-  }
-
-  @Test
-  public void testJobHistoryExceeded() {
-    IScheduledTask a = makeTask("a", RUNNING);
-    clock.advance(ONE_MS);
-    IScheduledTask aKilled = copy(a, KILLED);
-
-    IScheduledTask b = makeTask("b", RUNNING);
-    clock.advance(ONE_MS);
-    IScheduledTask bKilled = copy(b, KILLED);
-
-    IScheduledTask c = makeTask("c", RUNNING);
-    clock.advance(ONE_MS);
-    IScheduledTask cLost = copy(c, LOST);
-
-    IScheduledTask d = makeTask("d", RUNNING);
-    clock.advance(ONE_MS);
-    IScheduledTask dLost = copy(d, LOST);
-
-    expectNoImmediatePrune(ImmutableSet.of(a));
-    expectDefaultDelayedPrune();
-    expectNoImmediatePrune(ImmutableSet.of(a, b));
-    expectDefaultDelayedPrune();
-    expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold
-    expectDefaultDelayedPrune();
-    clock.advance(ONE_HOUR);
-    expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks
-    expectDefaultDelayedPrune();
-
-    control.replay();
-
-    changeState(a, aKilled);
-    changeState(b, bKilled);
-    changeState(c, cLost);
-    changeState(d, dLost);
-  }
-
-  // TODO(William Farner): Consider removing the thread safety tests.  Now that intrinsic locks
-  // are not used, it is rather awkward to test this.
-  @Test
-  public void testThreadSafeStateChangeEvent() throws Exception {
-    // This tests against regression where an executor pruning a task holds an intrinsic lock and
-    // an unrelated task state change in the scheduler fires an event that requires this intrinsic
-    // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
-    // fired.
-
-    pruner = prunerWithRealExecutor();
-    Command onDeleted = new Command() {
-      @Override
-      public void execute() {
-        // The goal is to verify that the call does not deadlock. We do not care about the outcome.
-        IScheduledTask b = makeTask("b", ASSIGNED);
-
-        changeState(b, STARTING);
-      }
-    };
-    CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
-
-    control.replay();
-
-    // Change the task to a terminal state and wait for it to be pruned.
-    changeState(makeTask(TASK_ID, RUNNING), KILLED);
-    taskDeleted.await();
-  }
-
-  private HistoryPruner prunerWithRealExecutor() {
-    ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat("testThreadSafeEvents-executor")
-            .build());
-    return new HistoryPruner(
-        realExecutor,
-        stateManager,
-        clock,
-        new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
-        storageUtil.storage);
-  }
-
-  private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) {
-    final CountDownLatch deleteCalled = new CountDownLatch(1);
-    final CountDownLatch eventDelivered = new CountDownLatch(1);
-
-    Thread eventDispatch = new Thread() {
-      @Override
-      public void run() {
-        try {
-          deleteCalled.await();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          fail("Interrupted while awaiting for delete call.");
-          return;
-        }
-        onDelete.execute();
-        eventDelivered.countDown();
-      }
-    };
-    eventDispatch.setDaemon(true);
-    eventDispatch.setName(getClass().getName() + "-EventDispatch");
-    eventDispatch.start();
-
-    stateManager.deleteTasks(ImmutableSet.of(taskId));
-    expectLastCall().andAnswer(new IAnswer<Void>() {
-      @Override
-      public Void answer() {
-        deleteCalled.countDown();
-        try {
-          eventDelivered.await();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          fail("Interrupted while awaiting for event delivery.");
-        }
-        return null;
-      }
-    });
-
-    return eventDelivered;
-  }
-
-  private void expectDeleteTasks(String... tasks) {
-    stateManager.deleteTasks(ImmutableSet.copyOf(tasks));
-  }
-
-  private Capture<Runnable> expectDefaultDelayedPrune() {
-    return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1);
-  }
-
-  private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) {
-    return expectDelayedPrune(timestampMillis, 1);
-  }
-
-  private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) {
-    expectImmediatePrune(tasksInJob);
-  }
-
-  private void expectImmediatePrune(
-      ImmutableSet<IScheduledTask> tasksInJob,
-      IScheduledTask... pruned) {
-
-    // Expect a deferred prune operation when a new task is being watched.
-    executor.submit(EasyMock.<Runnable>anyObject());
-    expectLastCall().andAnswer(
-        new IAnswer<Future<?>>() {
-          @Override
-          public Future<?> answer() {
-            Runnable work = (Runnable) EasyMock.getCurrentArguments()[0];
-            work.run();
-            return null;
-          }
-        }
-    );
-
-    IJobKey jobKey = Iterables.getOnlyElement(
-        FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
-    storageUtil.expectTaskFetch(HistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
-    if (pruned.length > 0) {
-      stateManager.deleteTasks(Tasks.ids(pruned));
-    }
-  }
-
-  private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
-    Capture<Runnable> capture = createCapture();
-    executor.schedule(
-        EasyMock.capture(capture),
-        eq(pruner.calculateTimeout(timestampMillis)),
-        eq(TimeUnit.MILLISECONDS));
-    expectLastCall().andReturn(future).times(count);
-    return capture;
-  }
-
-  private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) {
-    pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
-  }
-
-  private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
-    pruner.recordStateChange(
-        TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
-  }
-
-  private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
-    return IScheduledTask.build(task.newBuilder().setStatus(status));
-  }
-
-  private IScheduledTask makeTask(
-      String job,
-      String taskId,
-      ScheduleStatus status) {
-
-    return IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setTaskEvents(ImmutableList.of(new TaskEvent(clock.nowMillis(), status)))
-        .setAssignedTask(makeAssignedTask(job, taskId)));
-  }
-
-  private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
-    return makeTask(JOB_A, taskId, status);
-  }
-
-  private AssignedTask makeAssignedTask(String job, String taskId) {
-    return new AssignedTask()
-        .setSlaveHost(SLAVE_HOST)
-        .setTaskId(taskId)
-        .setTask(new TaskConfig()
-            .setOwner(new Identity().setRole("role").setUser("user"))
-            .setEnvironment("staging45")
-            .setJobName(job)
-            .setExecutorConfig(new ExecutorConfig("aurora", "config")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
new file mode 100644
index 0000000..748aac8
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.async.JobUpdateHistoryPruner.HistoryPrunerSettings;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+
+public class JobUpdateHistoryPrunerTest extends EasyMockTest {
+  @Test
+  public void testExecution() throws Exception {
+    StorageTestUtil storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+
+    final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
+    FakeScheduledExecutor executorClock =
+        FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2);
+
+    Clock mockClock = createMock(Clock.class);
+    expect(mockClock.nowMillis()).andReturn(2L).times(2);
+
+    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of("id1", "id2"));
+    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.<String>of());
+
+    control.replay();
+
+    executorClock.assertEmpty();
+    JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner(
+        mockClock,
+        executor,
+        storageUtil.storage,
+        new HistoryPrunerSettings(
+            Amount.of(1L, Time.MILLISECONDS),
+            Amount.of(1L, Time.MILLISECONDS),
+            1));
+
+    pruner.execute();
+    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
+    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
new file mode 100644
index 0000000..53d2c6b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.SANDBOX_DELETED;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.fail;
+
+public class TaskHistoryPrunerTest extends EasyMockTest {
+  private static final String JOB_A = "job-a";
+  private static final String TASK_ID = "task_id";
+  private static final String SLAVE_HOST = "HOST_A";
+  private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
+  private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
+  private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
+  private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
+  private static final int PER_JOB_HISTORY = 2;
+
+  private ScheduledFuture<?> future;
+  private ScheduledExecutorService executor;
+  private FakeClock clock;
+  private StateManager stateManager;
+  private StorageTestUtil storageUtil;
+  private TaskHistoryPruner pruner;
+
+  @Before
+  public void setUp() {
+    future = createMock(new Clazz<ScheduledFuture<?>>() { });
+    executor = createMock(ScheduledExecutorService.class);
+    clock = new FakeClock();
+    stateManager = createMock(StateManager.class);
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    pruner = new TaskHistoryPruner(
+        executor,
+        stateManager,
+        clock,
+        new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
+        storageUtil.storage);
+  }
+
+  @Test
+  public void testNoPruning() {
+    long taskATimestamp = clock.nowMillis();
+    IScheduledTask a = makeTask("a", FINISHED);
+
+    clock.advance(ONE_MS);
+    long taskBTimestamp = clock.nowMillis();
+    IScheduledTask b = makeTask("b", SANDBOX_DELETED);
+
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectOneDelayedPrune(taskATimestamp);
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectOneDelayedPrune(taskBTimestamp);
+
+    control.replay();
+
+    pruner.recordStateChange(TaskStateChange.initialized(a));
+    pruner.recordStateChange(TaskStateChange.initialized(b));
+  }
+
+  @Test
+  public void testStorageStartedWithPruning() {
+    long taskATimestamp = clock.nowMillis();
+    IScheduledTask a = makeTask("a", SANDBOX_DELETED);
+
+    clock.advance(ONE_MINUTE);
+    long taskBTimestamp = clock.nowMillis();
+    IScheduledTask b = makeTask("b", LOST);
+
+    clock.advance(ONE_MINUTE);
+    long taskCTimestamp = clock.nowMillis();
+    IScheduledTask c = makeTask("c", FINISHED);
+
+    clock.advance(ONE_MINUTE);
+    IScheduledTask d = makeTask("d", FINISHED);
+    IScheduledTask e = makeTask("job-x", "e", FINISHED);
+
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectOneDelayedPrune(taskATimestamp);
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectOneDelayedPrune(taskBTimestamp);
+    expectImmediatePrune(ImmutableSet.of(a, b, c), a);
+    expectOneDelayedPrune(taskCTimestamp);
+    expectImmediatePrune(ImmutableSet.of(b, c, d), b);
+    expectDefaultDelayedPrune();
+    expectNoImmediatePrune(ImmutableSet.of(e));
+    expectDefaultDelayedPrune();
+
+    control.replay();
+
+    for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
+      pruner.recordStateChange(TaskStateChange.initialized(task));
+    }
+  }
+
+  @Test
+  public void testStateChange() {
+    IScheduledTask starting = makeTask("a", STARTING);
+    IScheduledTask running = copy(starting, RUNNING);
+    IScheduledTask killed = copy(starting, KILLED);
+
+    expectNoImmediatePrune(ImmutableSet.of(killed));
+    expectDefaultDelayedPrune();
+
+    control.replay();
+
+    // No future set for non-terminal state transition.
+    changeState(starting, running);
+
+    // Future set for terminal state transition.
+    changeState(running, killed);
+  }
+
+  @Test
+  public void testActivateFutureAndExceedHistoryGoal() {
+    IScheduledTask running = makeTask("a", RUNNING);
+    IScheduledTask killed = copy(running, KILLED);
+    expectNoImmediatePrune(ImmutableSet.of(running));
+    Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
+
+    // Expect task "a" to be pruned when future is activated.
+    expectDeleteTasks("a");
+
+    control.replay();
+
+    // Capture future for inactive task "a"
+    changeState(running, killed);
+    clock.advance(ONE_HOUR);
+    // Execute future to prune task "a" from the system.
+    delayedDelete.getValue().run();
+  }
+
+  @Test
+  public void testJobHistoryExceeded() {
+    IScheduledTask a = makeTask("a", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask aKilled = copy(a, KILLED);
+
+    IScheduledTask b = makeTask("b", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask bKilled = copy(b, KILLED);
+
+    IScheduledTask c = makeTask("c", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask cLost = copy(c, LOST);
+
+    IScheduledTask d = makeTask("d", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask dLost = copy(d, LOST);
+
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectDefaultDelayedPrune();
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectDefaultDelayedPrune();
+    expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold
+    expectDefaultDelayedPrune();
+    clock.advance(ONE_HOUR);
+    expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks
+    expectDefaultDelayedPrune();
+
+    control.replay();
+
+    changeState(a, aKilled);
+    changeState(b, bKilled);
+    changeState(c, cLost);
+    changeState(d, dLost);
+  }
+
+  // TODO(William Farner): Consider removing the thread safety tests.  Now that intrinsic locks
+  // are not used, it is rather awkward to test this.
+  @Test
+  public void testThreadSafeStateChangeEvent() throws Exception {
+    // This tests against regression where an executor pruning a task holds an intrinsic lock and
+    // an unrelated task state change in the scheduler fires an event that requires this intrinsic
+    // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
+    // fired.
+
+    pruner = prunerWithRealExecutor();
+    Command onDeleted = new Command() {
+      @Override
+      public void execute() {
+        // The goal is to verify that the call does not deadlock. We do not care about the outcome.
+        IScheduledTask b = makeTask("b", ASSIGNED);
+
+        changeState(b, STARTING);
+      }
+    };
+    CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
+
+    control.replay();
+
+    // Change the task to a terminal state and wait for it to be pruned.
+    changeState(makeTask(TASK_ID, RUNNING), KILLED);
+    taskDeleted.await();
+  }
+
+  private TaskHistoryPruner prunerWithRealExecutor() {
+    ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("testThreadSafeEvents-executor")
+            .build());
+    return new TaskHistoryPruner(
+        realExecutor,
+        stateManager,
+        clock,
+        new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
+        storageUtil.storage);
+  }
+
+  private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) {
+    final CountDownLatch deleteCalled = new CountDownLatch(1);
+    final CountDownLatch eventDelivered = new CountDownLatch(1);
+
+    Thread eventDispatch = new Thread() {
+      @Override
+      public void run() {
+        try {
+          deleteCalled.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          fail("Interrupted while awaiting for delete call.");
+          return;
+        }
+        onDelete.execute();
+        eventDelivered.countDown();
+      }
+    };
+    eventDispatch.setDaemon(true);
+    eventDispatch.setName(getClass().getName() + "-EventDispatch");
+    eventDispatch.start();
+
+    stateManager.deleteTasks(ImmutableSet.of(taskId));
+    expectLastCall().andAnswer(new IAnswer<Void>() {
+      @Override
+      public Void answer() {
+        deleteCalled.countDown();
+        try {
+          eventDelivered.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          fail("Interrupted while awaiting for event delivery.");
+        }
+        return null;
+      }
+    });
+
+    return eventDelivered;
+  }
+
+  private void expectDeleteTasks(String... tasks) {
+    stateManager.deleteTasks(ImmutableSet.copyOf(tasks));
+  }
+
+  private Capture<Runnable> expectDefaultDelayedPrune() {
+    return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1);
+  }
+
+  private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) {
+    return expectDelayedPrune(timestampMillis, 1);
+  }
+
+  private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) {
+    expectImmediatePrune(tasksInJob);
+  }
+
+  private void expectImmediatePrune(
+      ImmutableSet<IScheduledTask> tasksInJob,
+      IScheduledTask... pruned) {
+
+    // Expect a deferred prune operation when a new task is being watched.
+    executor.submit(EasyMock.<Runnable>anyObject());
+    expectLastCall().andAnswer(
+        new IAnswer<Future<?>>() {
+          @Override
+          public Future<?> answer() {
+            Runnable work = (Runnable) EasyMock.getCurrentArguments()[0];
+            work.run();
+            return null;
+          }
+        }
+    );
+
+    IJobKey jobKey = Iterables.getOnlyElement(
+        FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
+    storageUtil.expectTaskFetch(TaskHistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
+    if (pruned.length > 0) {
+      stateManager.deleteTasks(Tasks.ids(pruned));
+    }
+  }
+
+  private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
+    Capture<Runnable> capture = createCapture();
+    executor.schedule(
+        EasyMock.capture(capture),
+        eq(pruner.calculateTimeout(timestampMillis)),
+        eq(TimeUnit.MILLISECONDS));
+    expectLastCall().andReturn(future).times(count);
+    return capture;
+  }
+
+  private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) {
+    pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
+  }
+
+  private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
+    pruner.recordStateChange(
+        TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
+  }
+
+  private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
+    return IScheduledTask.build(task.newBuilder().setStatus(status));
+  }
+
+  private IScheduledTask makeTask(
+      String job,
+      String taskId,
+      ScheduleStatus status) {
+
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setTaskEvents(ImmutableList.of(new TaskEvent(clock.nowMillis(), status)))
+        .setAssignedTask(makeAssignedTask(job, taskId)));
+  }
+
+  private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
+    return makeTask(JOB_A, taskId, status);
+  }
+
+  private AssignedTask makeAssignedTask(String job, String taskId) {
+    return new AssignedTask()
+        .setSlaveHost(SLAVE_HOST)
+        .setTaskId(taskId)
+        .setTask(new TaskConfig()
+            .setOwner(new Identity().setRole("role").setUser("user"))
+            .setEnvironment("staging45")
+            .setJobName(job)
+            .setExecutorConfig(new ExecutorConfig("aurora", "config")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
index dbf0bad..3871dae 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
@@ -65,7 +65,9 @@ import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_UPDATED;
 import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_UPDATING;
 import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
 import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.FAILED;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLL_BACK_PAUSED;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_PAUSED;
@@ -383,6 +385,116 @@ public class DBJobUpdateStoreTest {
     assertEquals(Optional.<IJobUpdateDetails>absent(), getUpdateDetails(updateId));
   }
 
+  @Test
+  public void testPruneHistory() {
+    String updateId1 = "u11";
+    String updateId2 = "u12";
+    String updateId3 = "u13";
+    String updateId4 = "u14";
+    String updateId5 = "u15";
+    String updateId6 = "u16";
+    String updateId7 = "u17";
+
+    IJobKey job2 = JobKeys.from("testRole2", "testEnv2", "job2");
+    IJobUpdate update1 = makeJobUpdate(JOB, updateId1);
+    IJobUpdate update2 = makeJobUpdate(JOB, updateId2);
+    IJobUpdate update3 = makeJobUpdate(JOB, updateId3);
+    IJobUpdate update4 = makeJobUpdate(JOB, updateId4);
+    IJobUpdate update5 = makeJobUpdate(job2, updateId5);
+    IJobUpdate update6 = makeJobUpdate(job2, updateId6);
+    IJobUpdate update7 = makeJobUpdate(job2, updateId7);
+
+    IJobUpdateEvent updateEvent1 = IJobUpdateEvent.build(new JobUpdateEvent(ROLLING_BACK, 123L));
+    IJobUpdateEvent updateEvent2 = IJobUpdateEvent.build(new JobUpdateEvent(ABORTED, 124L));
+    IJobUpdateEvent updateEvent3 = IJobUpdateEvent.build(new JobUpdateEvent(ROLLED_BACK, 125L));
+    IJobUpdateEvent updateEvent4 = IJobUpdateEvent.build(new JobUpdateEvent(FAILED, 126L));
+    IJobUpdateEvent updateEvent5 = IJobUpdateEvent.build(new JobUpdateEvent(ERROR, 123L));
+    IJobUpdateEvent updateEvent6 = IJobUpdateEvent.build(new JobUpdateEvent(FAILED, 125L));
+    IJobUpdateEvent updateEvent7 = IJobUpdateEvent.build(new JobUpdateEvent(ROLLING_FORWARD, 126L));
+
+    update1 = populateExpected(
+        saveUpdateNoEvent(update1, Optional.of("lock1")), ROLLING_BACK, 123L, 123L);
+    update2 = populateExpected(
+        saveUpdateNoEvent(update2, Optional.<String>absent()), ABORTED, 124L, 124L);
+    update3 = populateExpected(
+        saveUpdateNoEvent(update3, Optional.<String>absent()), ROLLED_BACK, 125L, 125L);
+    update4 = populateExpected(
+        saveUpdateNoEvent(update4, Optional.<String>absent()), FAILED, 126L, 126L);
+    update5 = populateExpected(
+        saveUpdateNoEvent(update5, Optional.<String>absent()), ERROR, 123L, 123L);
+    update6 = populateExpected(
+        saveUpdateNoEvent(update6, Optional.<String>absent()), FAILED, 125L, 125L);
+    update7 = populateExpected(
+        saveUpdateNoEvent(update7, Optional.of("lock2")), ROLLING_FORWARD, 126L, 126L);
+
+    saveJobEvent(updateEvent1, updateId1);
+    saveJobEvent(updateEvent2, updateId2);
+    saveJobEvent(updateEvent3, updateId3);
+    saveJobEvent(updateEvent4, updateId4);
+    saveJobEvent(updateEvent5, updateId5);
+    saveJobEvent(updateEvent6, updateId6);
+    saveJobEvent(updateEvent7, updateId7);
+
+    assertEquals(update1, getUpdate(updateId1).get());
+    assertEquals(update2, getUpdate(updateId2).get());
+    assertEquals(update3, getUpdate(updateId3).get());
+    assertEquals(update4, getUpdate(updateId4).get());
+    assertEquals(update5, getUpdate(updateId5).get());
+    assertEquals(update6, getUpdate(updateId6).get());
+    assertEquals(update7, getUpdate(updateId7).get());
+
+    long pruningThreshold = 120L;
+
+    // No updates pruned.
+    assertEquals(ImmutableSet.<String>of(), pruneHistory(3, pruningThreshold));
+    assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+    assertEquals(Optional.of(update6), getUpdate(updateId6));
+    assertEquals(Optional.of(update5), getUpdate(updateId5));
+
+    assertEquals(Optional.of(update4), getUpdate(updateId4));
+    assertEquals(Optional.of(update3), getUpdate(updateId3));
+    assertEquals(Optional.of(update2), getUpdate(updateId2));
+    assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+
+    assertEquals(ImmutableSet.of(updateId2), pruneHistory(2, pruningThreshold));
+    // No updates pruned.
+    assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+    assertEquals(Optional.of(update6), getUpdate(updateId6));
+    assertEquals(Optional.of(update5), getUpdate(updateId5));
+
+    // 1 update pruned.
+    assertEquals(Optional.of(update4), getUpdate(updateId4));
+    assertEquals(Optional.of(update3), getUpdate(updateId3));
+    assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId2));
+    assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+
+    assertEquals(ImmutableSet.of(updateId5, updateId3), pruneHistory(1, pruningThreshold));
+    // 1 update pruned.
+    assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+    assertEquals(Optional.of(update6), getUpdate(updateId6));
+    assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId5));
+
+    // 2 updates pruned.
+    assertEquals(Optional.of(update4), getUpdate(updateId4));
+    assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId3));
+    assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+
+    // The oldest update is pruned.
+    assertEquals(ImmutableSet.of(updateId6), pruneHistory(1, 126L));
+    assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+    assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId6));
+
+    assertEquals(Optional.of(update4), getUpdate(updateId4));
+    assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+
+    // Nothing survives the 0 per job count.
+    assertEquals(ImmutableSet.of(updateId4), pruneHistory(0, pruningThreshold));
+    assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+
+    assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId4));
+    assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+  }
+
   @Test(expected = StorageException.class)
   public void testSaveUpdateWithoutLock() {
     final IJobUpdate update = makeJobUpdate(JOB, "updateId");
@@ -414,13 +526,7 @@ public class DBJobUpdateStoreTest {
     final IJobUpdate update = makeJobUpdate(JOB, "update1");
     saveUpdate(update, Optional.of("lock1"));
 
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getLockStore().removeLock(
-            makeLock(update.getSummary().getJobKey(), "lock1").getKey());
-      }
-    });
+    removeLock(update, "lock1");
 
     assertEquals(
         Optional.of(updateJobDetails(populateExpected(update), FIRST_EVENT)),
@@ -614,7 +720,7 @@ public class DBJobUpdateStoreTest {
         .setUser("fake user"));
   }
 
-  private void saveUpdate(final IJobUpdate update, final Optional<String> lockToken) {
+  private IJobUpdate saveUpdate(final IJobUpdate update, final Optional<String> lockToken) {
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       public void execute(MutableStoreProvider storeProvider) {
@@ -628,6 +734,23 @@ public class DBJobUpdateStoreTest {
             update.getSummary().getUpdateId());
       }
     });
+
+    return update;
+  }
+
+  private IJobUpdate saveUpdateNoEvent(final IJobUpdate update, final Optional<String> lockToken) {
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(MutableStoreProvider storeProvider) {
+        if (lockToken.isPresent()) {
+          storeProvider.getLockStore().saveLock(
+              makeLock(update.getSummary().getJobKey(), lockToken.get()));
+        }
+        storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
+      }
+    });
+
+    return update;
   }
 
   private void saveJobEvent(final IJobUpdateEvent event, final String updateId) {
@@ -657,6 +780,25 @@ public class DBJobUpdateStoreTest {
     });
   }
 
+  private Set<String> pruneHistory(final int retainCount, final long pruningThresholdMs) {
+    return storage.write(new MutateWork.Quiet<Set<String>>() {
+      @Override
+      public Set<String> apply(MutableStoreProvider storeProvider) {
+        return storeProvider.getJobUpdateStore().pruneHistory(retainCount, pruningThresholdMs);
+      }
+    });
+  }
+
+  private void removeLock(final IJobUpdate update, final String lockToken) {
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getLockStore().removeLock(
+            makeLock(update.getSummary().getJobKey(), lockToken).getKey());
+      }
+    });
+  }
+
   private IJobUpdate populateExpected(IJobUpdate update) {
     return populateExpected(update, ROLLING_FORWARD, CREATED_MS, CREATED_MS);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 68df0d5..7a8c3b8 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -55,6 +55,7 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
 import org.apache.aurora.gen.storage.RemoveJob;
 import org.apache.aurora.gen.storage.RemoveLock;
 import org.apache.aurora.gen.storage.RemoveQuota;
@@ -854,6 +855,32 @@ public class LogStorageTest extends EasyMockTest {
     }.run();
   }
 
+  @Test
+  public void testPruneHistory() throws Exception {
+    final PruneJobUpdateHistory pruneHistory = new PruneJobUpdateHistory()
+        .setHistoryPruneThresholdMs(1L)
+        .setPerJobRetainCount(1);
+
+    new MutationFixture() {
+      @Override
+      protected void setupExpectations() throws Exception {
+        storageUtil.expectWriteOperation();
+        expect(storageUtil.jobUpdateStore.pruneHistory(
+            pruneHistory.getPerJobRetainCount(),
+            pruneHistory.getHistoryPruneThresholdMs())).andReturn(ImmutableSet.of("id1"));
+
+        streamMatcher.expectTransaction(Op.pruneJobUpdateHistory(pruneHistory)).andReturn(position);
+      }
+
+      @Override
+      protected void performMutations(MutableStoreProvider storeProvider) {
+        storeProvider.getJobUpdateStore().pruneHistory(
+            pruneHistory.getPerJobRetainCount(),
+            pruneHistory.getHistoryPruneThresholdMs());
+      }
+    }.run();
+  }
+
   private LogEntry createTransaction(Op... ops) {
     return LogEntry.transaction(
         new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
new file mode 100644
index 0000000..1688a33
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.testing;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.common.collections.Pair;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A simulated scheduled executor that records scheduled work and executes it when the clock is
+ * advanced past their execution time.
+ */
+public final class FakeScheduledExecutor extends FakeClock {
+
+  private final List<Pair<Long, Runnable>> deferredWork = Lists.newArrayList();
+
+  private FakeScheduledExecutor() { }
+
+  public static FakeScheduledExecutor scheduleExecutor(ScheduledExecutorService mock) {
+    FakeScheduledExecutor executor = new FakeScheduledExecutor();
+    mock.schedule(
+        EasyMock.<Runnable>anyObject(),
+        EasyMock.anyLong(),
+        EasyMock.eq(TimeUnit.MILLISECONDS));
+    expectLastCall().andAnswer(addExpectations(executor, 1)).anyTimes();
+
+    return executor;
+  }
+
+  public static FakeScheduledExecutor scheduleAtFixedRateExecutor(
+      ScheduledExecutorService mock,
+      int maxInvocations) {
+
+    FakeScheduledExecutor executor = new FakeScheduledExecutor();
+    mock.scheduleAtFixedRate(
+        EasyMock.<Runnable>anyObject(),
+        EasyMock.anyLong(),
+        EasyMock.anyLong(),
+        EasyMock.eq(TimeUnit.MILLISECONDS));
+    expectLastCall().andAnswer(addExpectations(executor, maxInvocations)).once();
+
+    return executor;
+  }
+
+  private static IAnswer<ScheduledFuture<?>> addExpectations(
+      final FakeScheduledExecutor executor,
+      final int workCount) {
+
+    return new IAnswer<ScheduledFuture<?>>() {
+      @Override
+      public ScheduledFuture<?> answer() {
+        Object[] args = EasyMock.getCurrentArguments();
+        Runnable work = (Runnable) args[0];
+        long millis = (Long) args[1];
+        Preconditions.checkArgument(millis > 0);
+        for (int i = 1; i <= workCount; i++) {
+          executor.deferredWork.add(Pair.of(executor.nowMillis() + i * millis, work));
+        }
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public void setNowMillis(long nowMillis) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void advance(Amount<Long, Time> period) {
+    super.advance(period);
+    Iterator<Pair<Long, Runnable>> entries = deferredWork.iterator();
+    List<Runnable> toExecute = Lists.newArrayList();
+    while (entries.hasNext()) {
+      Pair<Long, Runnable> next = entries.next();
+      if (next.getFirst() <= nowMillis()) {
+        entries.remove();
+        toExecute.add(next.getSecond());
+      }
+    }
+    for (Runnable work : toExecute) {
+      work.run();
+    }
+  }
+
+  public void assertEmpty() {
+    assertEquals(ImmutableList.<Pair<Long, Runnable>>of(), deferredWork);
+  }
+}