You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/03/07 01:27:48 UTC

git commit: Count on task timeouts and task pruning to be idempotent, simplifying handling code.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 6e4fe853e -> bb52bcbf6


Count on task timeouts and task pruning to be idempotent, simplifying handling
code.

Reviewed at https://reviews.apache.org/r/18484/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/bb52bcbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/bb52bcbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/bb52bcbf

Branch: refs/heads/master
Commit: bb52bcbf6538d7de53d20849c0fdffca0137f9c2
Parents: 6e4fe85
Author: Bill Farner <wf...@apache.org>
Authored: Thu Mar 6 16:26:49 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Mar 6 16:26:54 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/HistoryPruner.java   | 100 +++------
 .../aurora/scheduler/async/TaskTimeout.java     | 196 ++---------------
 .../scheduler/async/HistoryPrunerTest.java      | 218 ++++++++-----------
 .../aurora/scheduler/async/TaskTimeoutTest.java |  96 --------
 4 files changed, 156 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bb52bcbf/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
index c80c000..5bf9838 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
@@ -18,10 +18,7 @@ package org.apache.aurora.scheduler.async;
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
@@ -29,20 +26,20 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.BindingAnnotation;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -55,7 +52,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 
 /**
  * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
@@ -64,19 +60,12 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 public class HistoryPruner implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
 
-  private final Multimap<IJobKey, String> tasksByJob =
-      Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
-  @VisibleForTesting
-  Multimap<IJobKey, String> getTasksByJob() {
-    return tasksByJob;
-  }
-
   private final ScheduledExecutorService executor;
   private final StateManager stateManager;
   private final Clock clock;
   private final long pruneThresholdMillis;
   private final int perJobHistoryGoal;
-  private final Map<String, Future<?>> taskIdToFuture = Maps.newConcurrentMap();
+  private final Storage storage;
 
   @BindingAnnotation
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@@ -88,13 +77,15 @@ public class HistoryPruner implements EventSubscriber {
       final StateManager stateManager,
       final Clock clock,
       @PruneThreshold Amount<Long, Time> inactivePruneThreshold,
-      @PruneThreshold int perJobHistoryGoal) {
+      @PruneThreshold int perJobHistoryGoal,
+      Storage storage) {
 
     this.executor = checkNotNull(executor);
     this.stateManager = checkNotNull(stateManager);
     this.clock = checkNotNull(clock);
     this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
     this.perJobHistoryGoal = perJobHistoryGoal;
+    this.storage = checkNotNull(storage);
   }
 
   @VisibleForTesting
@@ -125,21 +116,9 @@ public class HistoryPruner implements EventSubscriber {
     stateManager.deleteTasks(taskIds);
   }
 
-  /**
-   * When triggered, removes the tasks scheduled for pruning and cancels any existing future.
-   *
-   * @param event A new TasksDeleted event.
-   */
-  @Subscribe
-  public void tasksDeleted(final TasksDeleted event) {
-    for (IScheduledTask task : event.getTasks()) {
-      String id = Tasks.id(task);
-      tasksByJob.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(task), id);
-      Future<?> future = taskIdToFuture.remove(id);
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
+  @VisibleForTesting
+  static Query.Builder jobHistoryQuery(IJobKey jobKey) {
+    return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
   }
 
   private void registerInactiveTask(
@@ -148,41 +127,34 @@ public class HistoryPruner implements EventSubscriber {
       long timeRemaining) {
 
     LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
-    // Insert the latest inactive task at the tail.
-    tasksByJob.put(jobKey, taskId);
-    Runnable runnable = new Runnable() {
+    executor.schedule(
+        new Runnable() {
+          @Override
+          public void run() {
+            LOG.info("Pruning expired inactive task " + taskId);
+            deleteTasks(ImmutableSet.of(taskId));
+          }
+        },
+        timeRemaining,
+        TimeUnit.MILLISECONDS);
+
+    executor.submit(new Runnable() {
       @Override
       public void run() {
-        LOG.info("Pruning expired inactive task " + taskId);
-        tasksByJob.remove(jobKey, taskId);
-        taskIdToFuture.remove(taskId);
-        deleteTasks(ImmutableSet.of(taskId));
-      }
-    };
-    taskIdToFuture.put(taskId, executor.schedule(runnable, timeRemaining, TimeUnit.MILLISECONDS));
-
-    ImmutableSet.Builder<String> pruneTaskIds = ImmutableSet.builder();
-    Collection<String> tasks = tasksByJob.get(jobKey);
-    // From Multimaps javadoc: "It is imperative that the user manually synchronize on the returned
-    // multimap when accessing any of its collection views".
-    synchronized (tasksByJob) {
-      Iterator<String> iterator = tasks.iterator();
-      while (tasks.size() > perJobHistoryGoal) {
-        // Pick oldest task from the head. Guaranteed by LinkedHashMultimap based on insertion
-        // order.
-        String id = iterator.next();
-        iterator.remove();
-        pruneTaskIds.add(id);
-        Future<?> future = taskIdToFuture.remove(id);
-        if (future != null) {
-          future.cancel(false);
+        Collection<IScheduledTask> inactiveTasks =
+            Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
+        int tasksToPrune = inactiveTasks.size() - perJobHistoryGoal;
+        if (tasksToPrune > 0) {
+          if (inactiveTasks.size() > perJobHistoryGoal) {
+            Set<String> toPrune = FluentIterable
+                .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+                .limit(tasksToPrune)
+                .transform(Tasks.SCHEDULED_TO_ID)
+                .toSet();
+            deleteTasks(toPrune);
+          }
         }
       }
-    }
-
-    Set<String> ids = pruneTaskIds.build();
-    if (!ids.isEmpty()) {
-      deleteTasks(ids);
-    }
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bb52bcbf/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 82b483b..de79912 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -16,9 +16,7 @@
 package org.apache.aurora.scheduler.async;
 
 import java.util.EnumSet;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,19 +25,11 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
 import com.google.common.eventbus.Subscribe;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -59,9 +49,6 @@ class TaskTimeout implements EventSubscriber {
   static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
 
   @VisibleForTesting
-  static final String TRANSIENT_COUNT_STAT_NAME = "transient_states";
-
-  @VisibleForTesting
   static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
 
   @VisibleForTesting
@@ -72,76 +59,22 @@ class TaskTimeout implements EventSubscriber {
       ScheduleStatus.KILLING,
       ScheduleStatus.DRAINING);
 
-  private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
-
-  private static final class TimeoutKey {
-    private final String taskId;
-    private final ScheduleStatus status;
-
-    private TimeoutKey(String taskId, ScheduleStatus status) {
-      this.taskId = taskId;
-      this.status = status;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(taskId, status);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof TimeoutKey)) {
-        return false;
-      }
-      TimeoutKey key = (TimeoutKey) o;
-      return Objects.equal(taskId, key.taskId)
-          && (status == key.status);
-    }
-
-    @Override
-    public String toString() {
-      return taskId + ":" + status;
-    }
-  }
-
   private final ScheduledExecutorService executor;
   private final StateManager stateManager;
   private final long timeoutMillis;
-  private final Clock clock;
   private final AtomicLong timedOutTasks;
 
   @Inject
   TaskTimeout(
       ScheduledExecutorService executor,
       StateManager stateManager,
-      final Clock clock,
       Amount<Long, Time> timeout,
       StatsProvider statsProvider) {
 
     this.executor = checkNotNull(executor);
     this.stateManager = checkNotNull(stateManager);
     this.timeoutMillis = timeout.as(Time.MILLISECONDS);
-    this.clock = checkNotNull(clock);
     this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
-
-    exportStats(statsProvider);
-  }
-
-  private void registerTimeout(TimeoutKey key) {
-    // This is an obvious check-then-act, but:
-    //   - there isn't much of a better option, given that we have to get the Future before
-    //     inserting into the map
-    //   - a key collision only happens in practice if something is wrong externally to this class
-    //     (double event for the same state)
-    //   - the outcome is low-risk, we would wind up with a redundant Future that will eventually
-    //     no-op
-    if (!futures.containsKey(key)) {
-      Future<?> timeoutHandler = executor.schedule(
-          new TimedOutTaskHandler(key),
-          timeoutMillis,
-          TimeUnit.MILLISECONDS);
-      futures.put(key, new Context(clock.nowMillis(), timeoutHandler));
-    }
   }
 
   private static boolean isTransient(ScheduleStatus status) {
@@ -150,112 +83,31 @@ class TaskTimeout implements EventSubscriber {
 
   @Subscribe
   public void recordStateChange(TaskStateChange change) {
-    String taskId = change.getTaskId();
-    ScheduleStatus newState = change.getNewState();
-    if (change.isTransition() && isTransient(change.getOldState().get())) {
-      TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState().get());
-      Context context = futures.remove(oldKey);
-      if (context != null) {
-        LOG.fine("Canceling state timeout for task " + oldKey);
-        context.future.cancel(false);
-      }
-    }
-
+    final String taskId = change.getTaskId();
+    final ScheduleStatus newState = change.getNewState();
     if (isTransient(newState)) {
-      registerTimeout(new TimeoutKey(taskId, change.getNewState()));
-    }
-  }
-
-  private class TimedOutTaskHandler implements Runnable {
-    private final TimeoutKey key;
-
-    TimedOutTaskHandler(TimeoutKey key) {
-      this.key = key;
-    }
-
-    @Override
-    public void run() {
-      Context context = futures.get(key);
-      try {
-        if (context == null) {
-          LOG.warning("Timeout context not found for " + key);
-          return;
-        }
-
-        LOG.info("Timeout reached for task " + key);
-        // This query acts as a CAS by including the state that we expect the task to be in if the
-        // timeout is still valid.  Ideally, the future would have already been canceled, but in the
-        // event of a state transition race, including transientState prevents an unintended
-        // task timeout.
-        // Note: This requires LOST transitions trigger Driver.killTask.
-        if (stateManager.changeState(
-            key.taskId,
-            Optional.of(key.status),
-            ScheduleStatus.LOST,
-            TIMEOUT_MESSAGE)) {
-
-          timedOutTasks.incrementAndGet();
-        } else {
-          LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
-        }
-      } finally {
-        futures.remove(key);
-      }
-    }
-  }
-
-  private class Context {
-    private final long timestampMillis;
-    private final Future<?> future;
-
-    Context(long timestampMillis, Future<?> future) {
-      this.timestampMillis = timestampMillis;
-      this.future = future;
-    }
-  }
-
-  private static final Function<Context, Long> CONTEXT_TIMESTAMP = new Function<Context, Long>() {
-    @Override
-    public Long apply(Context context) {
-      return context.timestampMillis;
-    }
-  };
-
-  private static final Ordering<Context> TIMESTAMP_ORDER =
-      Ordering.natural().onResultOf(CONTEXT_TIMESTAMP);
-
-  @VisibleForTesting
-  static String waitingTimeStatName(ScheduleStatus status) {
-    return "scheduler_max_" + status + "_waiting_ms";
-  }
-
-  private void exportStats(StatsProvider statsProvider) {
-    statsProvider.makeGauge(TRANSIENT_COUNT_STAT_NAME, new Supplier<Number>() {
-      @Override
-      public Number get() {
-          return futures.size();
-        }
-    });
-
-    for (final ScheduleStatus status : TRANSIENT_STATES) {
-      statsProvider.makeGauge(waitingTimeStatName(status), new Supplier<Number>() {
-        private final Predicate<TimeoutKey> statusMatcher = new Predicate<TimeoutKey>() {
-          @Override
-          public boolean apply(TimeoutKey key) {
-            return key.status == status;
-          }
-        };
-
-        @Override
-        public Number get() {
-          Iterable<Context> matches = Maps.filterKeys(futures, statusMatcher).values();
-          if (Iterables.isEmpty(matches)) {
-            return 0L;
-          } else {
-            return clock.nowMillis() - TIMESTAMP_ORDER.min(matches).timestampMillis;
-          }
-        }
-      });
+      executor.schedule(
+          new Runnable() {
+            @Override
+            public void run() {
+              // This query acts as a CAS by including the state that we expect the task to be in if
+              // the timeout is still valid.  Ideally, the future would have already been canceled,
+              // but in the event of a state transition race, including transientState prevents an
+              // unintended task timeout.
+              // Note: This requires LOST transitions trigger Driver.killTask.
+              if (stateManager.changeState(
+                  taskId,
+                  Optional.of(newState),
+                  ScheduleStatus.LOST,
+                  TIMEOUT_MESSAGE)) {
+
+                LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+                timedOutTasks.incrementAndGet();
+              }
+            }
+          },
+          timeoutMillis,
+          TimeUnit.MILLISECONDS);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bb52bcbf/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
index 811f68c..f7c9e5e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
@@ -17,13 +17,15 @@ package org.apache.aurora.scheduler.async;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.common.base.Command;
 import com.twitter.common.quantity.Amount;
@@ -43,11 +45,10 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -57,11 +58,8 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class HistoryPrunerTest extends EasyMockTest {
@@ -77,6 +75,7 @@ public class HistoryPrunerTest extends EasyMockTest {
   private ScheduledExecutorService executor;
   private FakeClock clock;
   private StateManager stateManager;
+  private StorageTestUtil storageUtil;
   private HistoryPruner pruner;
 
   @Before
@@ -85,21 +84,15 @@ public class HistoryPrunerTest extends EasyMockTest {
     executor = createMock(ScheduledExecutorService.class);
     clock = new FakeClock();
     stateManager = createMock(StateManager.class);
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
     pruner = new HistoryPruner(
         executor,
         stateManager,
         clock,
         ONE_DAY,
-        PER_JOB_HISTORY);
-  }
-
-  @After
-  public void validateNoLeak() {
-    synchronized (pruner.getTasksByJob()) {
-      assertEquals(
-          ImmutableMultimap.<IJobKey, String>of(),
-          ImmutableMultimap.copyOf(pruner.getTasksByJob()));
-    }
+        PER_JOB_HISTORY,
+        storageUtil.storage);
   }
 
   @Test
@@ -111,18 +104,15 @@ public class HistoryPrunerTest extends EasyMockTest {
     long taskBTimestamp = clock.nowMillis();
     IScheduledTask b = makeTask("b", LOST);
 
-    expectOneTaskWatch(taskATimestamp);
-    expectOneTaskWatch(taskBTimestamp);
-
-    expectCancelFuture().times(2);
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectOneDelayedPrune(taskATimestamp);
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectOneDelayedPrune(taskBTimestamp);
 
     control.replay();
 
     pruner.recordStateChange(TaskStateChange.initialized(a));
     pruner.recordStateChange(TaskStateChange.initialized(b));
-
-    // Clean-up
-    pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a, b)));
   }
 
   @Test
@@ -142,53 +132,48 @@ public class HistoryPrunerTest extends EasyMockTest {
     IScheduledTask d = makeTask("d", FINISHED);
     IScheduledTask e = makeTask("job-x", "e", FINISHED);
 
-    expectOneTaskWatch(taskATimestamp);
-    expectOneTaskWatch(taskBTimestamp);
-    expectOneTaskWatch(taskCTimestamp);
-    expectDefaultTaskWatch();
-    expectDefaultTaskWatch();
-
-    // Cancel future and delete pruned task "a" asynchronously.
-    expectCancelFuture();
-    stateManager.deleteTasks(Tasks.ids(a));
-
-    // Cancel future and delete pruned task "b" asynchronously.
-    expectCancelFuture();
-    stateManager.deleteTasks(Tasks.ids(b));
-
-    expectCancelFuture().times(3);
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectOneDelayedPrune(taskATimestamp);
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectOneDelayedPrune(taskBTimestamp);
+    expectImmediatePrune(ImmutableSet.of(a, b, c), a);
+    expectOneDelayedPrune(taskCTimestamp);
+    expectImmediatePrune(ImmutableSet.of(b, c, d), b);
+    expectDefaultDelayedPrune();
+    expectNoImmediatePrune(ImmutableSet.of(e));
+    expectDefaultDelayedPrune();
 
     control.replay();
 
     for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
       pruner.recordStateChange(TaskStateChange.initialized(task));
     }
-
-    // Clean-up
-    pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(c, d, e)));
   }
 
   @Test
   public void testStateChange() {
-    expectDefaultTaskWatch();
+    IScheduledTask starting = makeTask("a", STARTING);
+    IScheduledTask running = copy(starting, RUNNING);
+    IScheduledTask killed = copy(starting, KILLED);
 
-    expectCancelFuture();
+    expectNoImmediatePrune(ImmutableSet.of(killed));
+    expectDefaultDelayedPrune();
 
     control.replay();
 
     // No future set for non-terminal state transition.
-    changeState(STARTING, RUNNING);
+    changeState(starting, running);
 
     // Future set for terminal state transition.
-    IScheduledTask a = changeState(RUNNING, KILLED);
-
-    // Clean-up
-    pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a)));
+    changeState(running, killed);
   }
 
   @Test
   public void testActivateFutureAndExceedHistoryGoal() {
-    Capture<Runnable> delayedDelete = expectDefaultTaskWatch();
+    IScheduledTask running = makeTask("a", RUNNING);
+    IScheduledTask killed = copy(running, KILLED);
+    expectNoImmediatePrune(ImmutableSet.of(running));
+    Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
 
     // Expect task "a" to be pruned when future is activated.
     stateManager.deleteTasks(ImmutableSet.of("a"));
@@ -196,7 +181,7 @@ public class HistoryPrunerTest extends EasyMockTest {
     control.replay();
 
     // Capture future for inactive task "a"
-    changeState("a", RUNNING, KILLED);
+    changeState(running, killed);
     clock.advance(ONE_HOUR);
     // Execute future to prune task "a" from the system.
     delayedDelete.getValue().run();
@@ -204,43 +189,30 @@ public class HistoryPrunerTest extends EasyMockTest {
 
   @Test
   public void testJobHistoryExceeded() {
-    // Future for tasks - a,b,c
-    expectDefaultTaskWatchTimes(3);
-
-    // Cancel future and delete task "a" asynchronously when history goal is exceeded.
-    expectCancelFuture();
-    stateManager.deleteTasks(ImmutableSet.of("a"));
-
-    expectCancelFuture().times(2);
-
-    control.replay();
-
-    changeState("a", RUNNING, KILLED);
+    IScheduledTask a = makeTask("a", RUNNING);
     clock.advance(ONE_HOUR);
-    IScheduledTask b = changeState("b", RUNNING, KILLED);
+    IScheduledTask aKilled = copy(a, KILLED);
+
+    IScheduledTask b = makeTask("b", RUNNING);
     clock.advance(ONE_HOUR);
-    IScheduledTask c = changeState("c", RUNNING, LOST);
+    IScheduledTask bKilled = copy(b, KILLED);
 
-    // Clean-up
-    pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(b, c)));
-  }
+    IScheduledTask c = makeTask("c", RUNNING);
+    clock.advance(ONE_HOUR);
+    IScheduledTask cLost = copy(c, LOST);
 
-  @Test
-  public void testTasksDeleted() {
-    IScheduledTask a = makeTask("a", FINISHED);
-    IScheduledTask b = makeTask("b", FINISHED);
-    expectDefaultTaskWatch();
-    expectCancelFuture();
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectDefaultDelayedPrune();
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectDefaultDelayedPrune();
+    expectImmediatePrune(ImmutableSet.of(a, b, c), a);
+    expectDefaultDelayedPrune();
 
     control.replay();
 
-    pruner.recordStateChange(TaskStateChange.initialized(a));
-
-    // Cancels existing future for task 'a'
-    pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a)));
-
-    // No-Op
-    pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(b)));
+    changeState(a, aKilled);
+    changeState(b, bKilled);
+    changeState(c, cLost);
   }
 
   // TODO(William Farner): Consider removing the thread safety tests.  Now that intrinsic locks
@@ -257,32 +229,9 @@ public class HistoryPrunerTest extends EasyMockTest {
       @Override
       public void execute() {
         // The goal is to verify that the call does not deadlock. We do not care about the outcome.
-        changeState("b", ASSIGNED, STARTING);
-      }
-    };
-    CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
-
-    control.replay();
-
-    // Change the task to a terminal state and wait for it to be pruned.
-    changeState(TASK_ID, RUNNING, KILLED);
-    taskDeleted.await();
-  }
-
-  @Test
-  public void testThreadSafeDeleteEvents() throws Exception {
-    // This tests against regression where deleting a task causes an event to be fired
-    // synchronously (on the EventBus) from a separate thread.  This posed a problem because
-    // the event handler was synchronized, causing the EventBus thread to deadlock acquiring
-    // the lock held by the thread deleting tasks.
+        IScheduledTask b = makeTask("b", ASSIGNED);
 
-    pruner = prunerWithRealExecutor();
-    Command onDeleted = new Command() {
-      @Override
-      public void execute() {
-        // The goal is to verify that the call does not deadlock. We do not care about the outcome.
-        pruner.tasksDeleted(
-            new TasksDeleted(ImmutableSet.of(makeTask("a", ScheduleStatus.KILLED))));
+        changeState(b, STARTING);
       }
     };
     CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
@@ -290,7 +239,7 @@ public class HistoryPrunerTest extends EasyMockTest {
     control.replay();
 
     // Change the task to a terminal state and wait for it to be pruned.
-    changeState(TASK_ID, RUNNING, KILLED);
+    changeState(makeTask(TASK_ID, RUNNING), KILLED);
     taskDeleted.await();
   }
 
@@ -305,7 +254,8 @@ public class HistoryPrunerTest extends EasyMockTest {
         stateManager,
         clock,
         Amount.of(1L, Time.MILLISECONDS),
-        PER_JOB_HISTORY);
+        PER_JOB_HISTORY,
+        storageUtil.storage);
   }
 
   private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) {
@@ -348,19 +298,44 @@ public class HistoryPrunerTest extends EasyMockTest {
     return eventDelivered;
   }
 
-  private Capture<Runnable> expectDefaultTaskWatch() {
-    return expectTaskWatch(ONE_DAY.as(Time.MILLISECONDS), 1);
+  private Capture<Runnable> expectDefaultDelayedPrune() {
+    return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1);
   }
 
-  private Capture<Runnable> expectDefaultTaskWatchTimes(int count) {
-    return expectTaskWatch(ONE_DAY.as(Time.MILLISECONDS), count);
+  private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) {
+    return expectDelayedPrune(timestampMillis, 1);
   }
 
-  private Capture<Runnable> expectOneTaskWatch(long timestampMillis) {
-    return expectTaskWatch(timestampMillis, 1);
+  private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) {
+    expectImmediatePrune(tasksInJob);
+  }
+
+  private void expectImmediatePrune(
+      ImmutableSet<IScheduledTask> tasksInJob,
+      IScheduledTask... pruned) {
+
+    // Expect a deferred prune operation when a new task is being watched.
+    executor.submit(EasyMock.<Runnable>anyObject());
+    expectLastCall().andAnswer(
+        new IAnswer<Future<?>>() {
+          @Override
+          public Future<?> answer() {
+            Runnable work = (Runnable) EasyMock.getCurrentArguments()[0];
+            work.run();
+            return null;
+          }
+        }
+    );
+
+    IJobKey jobKey = Iterables.getOnlyElement(
+        FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
+    storageUtil.expectTaskFetch(HistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
+    if (pruned.length > 0) {
+      stateManager.deleteTasks(Tasks.ids(pruned));
+    }
   }
 
-  private Capture<Runnable> expectTaskWatch(long timestampMillis, int count) {
+  private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
     Capture<Runnable> capture = createCapture();
     executor.schedule(
         EasyMock.capture(capture),
@@ -370,18 +345,17 @@ public class HistoryPrunerTest extends EasyMockTest {
     return capture;
   }
 
-  private IExpectationSetters<?> expectCancelFuture() {
-    return expect(future.cancel(false)).andReturn(true);
+  private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) {
+    pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
   }
 
-  private IScheduledTask changeState(ScheduleStatus from, ScheduleStatus to) {
-    return changeState(TASK_ID, from, to);
+  private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
+    pruner.recordStateChange(
+        TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
   }
 
-  private IScheduledTask changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
-    IScheduledTask task = makeTask(taskId, to);
-    pruner.recordStateChange(TaskStateChange.transition(task, from));
-    return task;
+  private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
+    return IScheduledTask.build(task.newBuilder().setStatus(status));
   }
 
   private IScheduledTask makeTask(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bb52bcbf/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 71d6a9e..bd1f599 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -15,16 +15,13 @@
  */
 package org.apache.aurora.scheduler.async;
 
-import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.StatsProvider;
@@ -41,13 +38,10 @@ import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.DRAINING;
 import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
@@ -55,7 +49,6 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.easymock.EasyMock.eq;
@@ -69,9 +62,6 @@ public class TaskTimeoutTest extends EasyMockTest {
   private static final long TIMEOUT_MS = Amount.of(1L, Time.MINUTES).as(Time.MILLISECONDS);
 
   private AtomicLong timedOutTaskCounter;
-  private Capture<Supplier<Number>> stateCountCapture;
-  private Map<ScheduleStatus, Capture<Supplier<Number>>> stateCaptures;
-
   private ScheduledExecutorService executor;
   private ScheduledFuture<?> future;
   private StateManager stateManager;
@@ -86,33 +76,9 @@ public class TaskTimeoutTest extends EasyMockTest {
     stateManager = createMock(StateManager.class);
     clock = new FakeClock();
     statsProvider = createMock(StatsProvider.class);
-    expectStatsProvider();
-  }
-
-  @After
-  public void verifyTasksDepleted() {
-    // Verify there is no memory leak.
-    assertEquals(0, stateCountCapture.getValue().get().intValue());
-  }
-
-  private void expectStatsProvider() {
     timedOutTaskCounter = new AtomicLong();
     expect(statsProvider.makeCounter(TaskTimeout.TIMED_OUT_TASKS_COUNTER))
         .andReturn(timedOutTaskCounter);
-
-    stateCountCapture = createCapture();
-    expect(statsProvider.makeGauge(
-        eq(TaskTimeout.TRANSIENT_COUNT_STAT_NAME),
-        EasyMock.capture(stateCountCapture))).andReturn(null);
-
-    stateCaptures = Maps.newHashMap();
-    for (ScheduleStatus status : TaskTimeout.TRANSIENT_STATES) {
-      Capture<Supplier<Number>> statusCapture = createCapture();
-      expect(statsProvider.makeGauge(
-          eq(TaskTimeout.waitingTimeStatName(status)),
-          EasyMock.capture(statusCapture))).andReturn(null);
-      stateCaptures.put(status, statusCapture);
-    }
   }
 
   private void replayAndCreate() {
@@ -120,7 +86,6 @@ public class TaskTimeoutTest extends EasyMockTest {
     timeout = new TaskTimeout(
         executor,
         stateManager,
-        clock,
         Amount.of(TIMEOUT_MS, Time.MILLISECONDS),
         statsProvider);
   }
@@ -139,10 +104,6 @@ public class TaskTimeoutTest extends EasyMockTest {
     return expectTaskWatch(TIMEOUT_MS);
   }
 
-  private IExpectationSetters<?> expectCancel() {
-    return expect(future.cancel(false)).andReturn(true);
-  }
-
   private void changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
     IScheduledTask task = IScheduledTask.build(new ScheduledTask()
         .setStatus(to)
@@ -157,9 +118,7 @@ public class TaskTimeoutTest extends EasyMockTest {
   @Test
   public void testNormalTransitions() {
     expectTaskWatch();
-    expectCancel();
     expectTaskWatch();
-    expectCancel();
 
     replayAndCreate();
 
@@ -174,7 +133,6 @@ public class TaskTimeoutTest extends EasyMockTest {
   @Test
   public void testTransientToTransient() {
     expectTaskWatch();
-    expectCancel();
     Capture<Runnable> killingTimeout = expectTaskWatch();
     expect(stateManager.changeState(
         TASK_ID,
@@ -241,11 +199,9 @@ public class TaskTimeoutTest extends EasyMockTest {
 
   @Test
   public void testStorageStart() {
-
     expectTaskWatch(TIMEOUT_MS);
     expectTaskWatch(TIMEOUT_MS);
     expectTaskWatch(TIMEOUT_MS);
-    expectCancel().times(3);
 
     replayAndCreate();
 
@@ -262,56 +218,4 @@ public class TaskTimeoutTest extends EasyMockTest {
     changeState("b", KILLING, KILLED);
     changeState("c", PREEMPTING, FINISHED);
   }
-
-  private void checkOutstandingTimer(ScheduleStatus status, long expectedValue) {
-    long value = stateCaptures.get(status).getValue().get().longValue();
-    assertEquals(expectedValue, value);
-  }
-
-  @Test
-  public void testOutstandingTimers() throws Exception {
-    expectTaskWatch();
-    expectTaskWatch();
-    expectCancel();
-    expectTaskWatch();
-    expectCancel().times(2);
-
-    replayAndCreate();
-
-    checkOutstandingTimer(ASSIGNED, 0);
-    checkOutstandingTimer(PREEMPTING, 0);
-    checkOutstandingTimer(RESTARTING, 0);
-    checkOutstandingTimer(DRAINING, 0);
-    checkOutstandingTimer(KILLING, 0);
-
-    changeState("a", PENDING, ASSIGNED);
-
-    Amount<Long, Time> tick = Amount.of(10L, Time.SECONDS);
-    clock.advance(tick);
-
-    checkOutstandingTimer(ASSIGNED, tick.as(Time.MILLISECONDS));
-
-    clock.advance(tick);
-    changeState("b", PENDING, ASSIGNED);
-
-    clock.advance(tick);
-    checkOutstandingTimer(ASSIGNED, tick.as(Time.MILLISECONDS) * 3);
-
-    changeState("a", ASSIGNED, RUNNING);
-    clock.advance(tick);
-    changeState("a", RUNNING, KILLING);
-    clock.advance(tick);
-
-    checkOutstandingTimer(ASSIGNED, tick.as(Time.MILLISECONDS) * 3);
-    checkOutstandingTimer(KILLING, tick.as(Time.MILLISECONDS));
-
-    changeState("a", KILLING, KILLED);
-    changeState("b", ASSIGNED, FINISHED);
-
-    checkOutstandingTimer(ASSIGNED, 0);
-    checkOutstandingTimer(PREEMPTING, 0);
-    checkOutstandingTimer(RESTARTING, 0);
-    checkOutstandingTimer(DRAINING, 0);
-    checkOutstandingTimer(KILLING, 0);
-  }
 }