You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/04/08 01:25:29 UTC

git commit: Remove duplicate task throttling behavior from TaskGroups, fix a few race conditions.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 589fa44c1 -> f73e29b31


Remove duplicate task throttling behavior from TaskGroups, fix a few race conditions.

Bugs closed: AURORA-302

Reviewed at https://reviews.apache.org/r/20066/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/f73e29b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/f73e29b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/f73e29b3

Branch: refs/heads/master
Commit: f73e29b3196f13e37d31c8e02dd17628edb548e0
Parents: 589fa44
Author: Bill Farner <wf...@apache.org>
Authored: Mon Apr 7 16:22:50 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Apr 7 16:22:50 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/async/TaskGroup.java       | 112 ++++-----------
 .../aurora/scheduler/async/TaskGroups.java      | 135 ++++++++-----------
 .../aurora/scheduler/async/TaskScheduler.java   |  23 ++--
 .../aurora/scheduler/async/TaskGroupsTest.java  | 130 ++++++++++++++++++
 .../scheduler/async/TaskSchedulerImplTest.java  |  30 ++---
 .../scheduler/async/TaskSchedulerTest.java      |  28 ++--
 6 files changed, 241 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
index ece7e3a..0aa54cd 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
@@ -17,16 +17,10 @@ package org.apache.aurora.scheduler.async;
 
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.twitter.common.base.Function;
-import com.twitter.common.util.BackoffStrategy;
+import com.google.common.collect.Lists;
 
 import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 
@@ -35,109 +29,51 @@ import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
  */
 class TaskGroup {
   private final GroupKey key;
-  private final BackoffStrategy backoffStrategy;
+  private long penaltyMs;
+  private final Queue<String> tasks;
 
-  private static final Function<Task, Long> TO_TIMESTAMP = new Function<Task, Long>() {
-    @Override
-    public Long apply(Task item) {
-      return item.readyTimestampMs;
-    }
-  };
-
-  // Order the tasks by the time they are ready to be scheduled
-  private static final Ordering<Task> TASK_ORDERING = Ordering.natural().onResultOf(TO_TIMESTAMP);
-  // 11 is the magic number used by PriorityBlockingQueue as the initial size.
-  private final Queue<Task> tasks = new PriorityBlockingQueue<>(11, TASK_ORDERING);
-  // Penalty for the task group for failing to schedule.
-  private final AtomicLong penaltyMs;
-
-  TaskGroup(GroupKey key, BackoffStrategy backoffStrategy) {
+  TaskGroup(GroupKey key, String initialTaskId) {
     this.key = key;
-    this.backoffStrategy = backoffStrategy;
-    penaltyMs = new AtomicLong();
-    resetPenaltyAndGet();
+    this.penaltyMs = 0;
+    this.tasks = Lists.newLinkedList();
+    this.tasks.add(initialTaskId);
   }
 
-  GroupKey getKey() {
+  synchronized GroupKey getKey() {
     return key;
   }
 
-  private static final Function<Task, String> TO_TASK_ID =
-      new Function<Task, String>() {
-        @Override
-        public String apply(Task item) {
-          return item.taskId;
-        }
-      };
-
-  /**
-   * Removes the task at the head of the queue.
-   *
-   * @return String the id of the head task.
-   * @throws IllegalStateException if the queue is empty.
-   */
-  String pop() throws IllegalStateException {
-    Task head = tasks.poll();
-    Preconditions.checkState(head != null);
-    return head.taskId;
+  synchronized Optional<String> peek() {
+    return Optional.fromNullable(tasks.peek());
   }
 
-  void remove(String taskId) {
-    Iterables.removeIf(tasks, Predicates.compose(Predicates.equalTo(taskId), TO_TASK_ID));
+  synchronized boolean hasMore() {
+    return !tasks.isEmpty();
   }
 
-  void push(final String taskId, long readyTimestamp) {
-    tasks.offer(new Task(taskId, readyTimestamp));
+  synchronized void remove(String taskId) {
+    tasks.remove(taskId);
   }
 
-  synchronized long resetPenaltyAndGet() {
-    penaltyMs.set(backoffStrategy.calculateBackoffMs(0));
-    return getPenaltyMs();
+  synchronized void offer(String taskId) {
+    tasks.offer(taskId);
   }
 
-  synchronized long penalizeAndGet() {
-    penaltyMs.set(backoffStrategy.calculateBackoffMs(getPenaltyMs()));
-    return getPenaltyMs();
+  synchronized void setPenaltyMs(long penaltyMs) {
+    this.penaltyMs = penaltyMs;
   }
 
-  GroupState isReady(long nowMs) {
-    Task task = tasks.peek();
-    if (task == null) {
-      return GroupState.EMPTY;
-    }
-
-    if (task.readyTimestampMs > nowMs) {
-      return GroupState.NOT_READY;
-    }
-    return GroupState.READY;
-  }
   // Begin methods used for debug interfaces.
 
-  public String getName() {
+  public synchronized String getName() {
     return key.toString();
   }
 
-  public Set<String> getTaskIds() {
-    return ImmutableSet.copyOf(Iterables.transform(tasks, TO_TASK_ID));
-  }
-
-  public long getPenaltyMs() {
-    return penaltyMs.get();
-  }
-
-  private static class Task {
-    private final String taskId;
-    private final long readyTimestampMs;
-
-    Task(String taskId, long readyTimestampMs) {
-      this.taskId = Preconditions.checkNotNull(taskId);
-      this.readyTimestampMs = readyTimestampMs;
-    }
+  public synchronized Set<String> getTaskIds() {
+    return ImmutableSet.copyOf(tasks);
   }
 
-  enum GroupState {
-    EMPTY,      // The group is empty.
-    NOT_READY,  // Every task in the group is not ready yet.
-    READY       // The task at the head of the queue is ready.
+  public synchronized long getPenaltyMs() {
+    return penaltyMs;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index 6c95c6f..6aff091 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -15,20 +15,18 @@
  */
 package org.apache.aurora.scheduler.async;
 
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -50,10 +48,11 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.async.TaskGroup.GroupState;
 
 /**
  * A collection of task groups, where a task group is a collection of tasks that are known to be
@@ -66,10 +65,10 @@ import static org.apache.aurora.scheduler.async.TaskGroup.GroupState;
  */
 public class TaskGroups implements EventSubscriber {
 
-  private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
-  private final LoadingCache<GroupKey, TaskGroup> groups;
-  private final Clock clock;
+  private final ConcurrentMap<GroupKey, TaskGroup> groups = Maps.newConcurrentMap();
+  private final ScheduledExecutorService executor;
+  private final TaskScheduler taskScheduler;
+  private final BackoffStrategy backoff;
   private final RescheduleCalculator rescheduleCalculator;
 
   static class TaskGroupsSettings {
@@ -95,98 +94,64 @@ public class TaskGroups implements EventSubscriber {
         settings.taskGroupBackoff,
         settings.rateLimiter,
         taskScheduler,
-        clock,
         rescheduleCalculator);
   }
 
   @VisibleForTesting
   TaskGroups(
       final ScheduledExecutorService executor,
-      final BackoffStrategy taskGroupBackoffStrategy,
+      final BackoffStrategy backoff,
       final RateLimiter rateLimiter,
       final TaskScheduler taskScheduler,
-      final Clock clock,
       final RescheduleCalculator rescheduleCalculator) {
 
-    checkNotNull(executor);
-    checkNotNull(taskGroupBackoffStrategy);
+    this.executor = checkNotNull(executor);
     checkNotNull(rateLimiter);
-    checkNotNull(taskScheduler);
-    this.clock = checkNotNull(clock);
+    this.taskScheduler = checkNotNull(taskScheduler);
+    this.backoff = checkNotNull(backoff);
     this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
 
     final TaskScheduler ratelLimitedScheduler = new TaskScheduler() {
       @Override
-      public TaskSchedulerResult schedule(String taskId) {
+      public boolean schedule(String taskId) {
         rateLimiter.acquire();
         return taskScheduler.schedule(taskId);
       }
     };
-
-    groups = CacheBuilder.newBuilder().build(new CacheLoader<GroupKey, TaskGroup>() {
-      @Override
-      public TaskGroup load(GroupKey key) {
-        TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
-        LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
-        startGroup(group, executor, ratelLimitedScheduler);
-        return group;
-      }
-    });
   }
 
-  private synchronized boolean maybeInvalidate(TaskGroup group) {
-    if (group.getTaskIds().isEmpty()) {
-      groups.invalidate(group.getKey());
-      return true;
+  private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) {
+    // Avoid check-then-act by holding the intrinsic lock.  If not done atomically, we could
+    // remove a group while a task is being added to it.
+    if (group.hasMore()) {
+      executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS);
+    } else {
+      groups.remove(group.getKey());
     }
-    return false;
   }
 
-  private void startGroup(
-      final TaskGroup group,
-      final ScheduledExecutorService executor,
-      final TaskScheduler taskScheduler) {
-
+  private void startGroup(final TaskGroup group) {
     Runnable monitor = new Runnable() {
       @Override
       public void run() {
-        GroupState state = group.isReady(clock.nowMillis());
-
-        switch (state) {
-          case EMPTY:
-            maybeInvalidate(group);
-            break;
-
-          case READY:
-            String id = group.pop();
-            TaskScheduler.TaskSchedulerResult result = taskScheduler.schedule(id);
-            switch (result) {
-              case SUCCESS:
-                if (!maybeInvalidate(group)) {
-                  executor.schedule(this, group.resetPenaltyAndGet(), TimeUnit.MILLISECONDS);
-                }
-                break;
-
-              case TRY_AGAIN:
-                group.push(id, clock.nowMillis());
-                executor.schedule(this, group.penalizeAndGet(), TimeUnit.MILLISECONDS);
-                break;
-
-              default:
-                throw new IllegalStateException("Unknown TaskSchedulerResult " + result);
+        Optional<String> taskId = group.peek();
+        long penaltyMs = 0;
+        if (taskId.isPresent()) {
+          if (taskScheduler.schedule(taskId.get())) {
+            group.remove(taskId.get());
+            if (group.hasMore()) {
+              penaltyMs = backoff.calculateBackoffMs(0);
             }
-            break;
-
-          case NOT_READY:
-            executor.schedule(this, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
-            break;
-
-          default:
-            throw new IllegalStateException("Unknown GroupState " + state);
+          } else {
+            penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs());
+          }
         }
+
+        group.setPenaltyMs(penaltyMs);
+        evaluateGroupLater(this, group);
       }
     };
-    executor.schedule(monitor, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
+    evaluateGroupLater(monitor, group);
   }
 
   private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
@@ -205,11 +170,6 @@ public class TaskGroups implements EventSubscriber {
     return executor;
   }
 
-  private synchronized void add(IAssignedTask task, long scheduleDelayMs) {
-    groups.getUnchecked(new GroupKey(task.getTask()))
-        .push(task.getTaskId(), clock.nowMillis() + scheduleDelayMs);
-  }
-
   /**
    * Informs the task groups of a task state change.
    * <p>
@@ -222,10 +182,21 @@ public class TaskGroups implements EventSubscriber {
   public synchronized void taskChangedState(TaskStateChange stateChange) {
     if (stateChange.getNewState() == PENDING) {
       IScheduledTask task = stateChange.getTask();
-      long readyAtMs = stateChange.isTransition()
-          ? rescheduleCalculator.getFlappingPenaltyMs(task)
-          : rescheduleCalculator.getStartupScheduleDelayMs(task);
-      add(task.getAssignedTask(), readyAtMs);
+      GroupKey key = new GroupKey(task.getAssignedTask().getTask());
+      TaskGroup newGroup = new TaskGroup(key, Tasks.id(task));
+      TaskGroup existing = groups.putIfAbsent(key, newGroup);
+      if (existing == null) {
+        long penaltyMs;
+        if (stateChange.isTransition()) {
+          penaltyMs = backoff.calculateBackoffMs(0);
+        } else {
+          penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task);
+        }
+        newGroup.setPenaltyMs(penaltyMs);
+        startGroup(newGroup);
+      } else {
+        existing.offer(Tasks.id(task));
+      }
     }
   }
 
@@ -238,7 +209,7 @@ public class TaskGroups implements EventSubscriber {
   public synchronized void tasksDeleted(TasksDeleted deleted) {
     for (IAssignedTask task
         : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
-      TaskGroup group = groups.getIfPresent(new GroupKey(task.getTask()));
+      TaskGroup group = groups.get(new GroupKey(task.getTask()));
       if (group != null) {
         group.remove(task.getTaskId());
       }
@@ -246,7 +217,7 @@ public class TaskGroups implements EventSubscriber {
   }
 
   public Iterable<TaskGroup> getGroups() {
-    return ImmutableSet.copyOf(groups.asMap().values());
+    return ImmutableSet.copyOf(groups.values());
   }
 
   static class GroupKey {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 263235c..f7f418a 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -79,15 +79,10 @@ interface TaskScheduler extends EventSubscriber {
    * Attempts to schedule a task, possibly performing irreversible actions.
    *
    * @param taskId The task to attempt to schedule.
-   * @return SUCCESS if the task was scheduled, TRY_AGAIN otherwise. The caller should call schedule
-   * again if TRY_AGAIN is returned.
+   * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should
+   *         call schedule again if {@code false} is returned.
    */
-  TaskSchedulerResult schedule(String taskId);
-
-  enum TaskSchedulerResult {
-    SUCCESS,
-    TRY_AGAIN
-  }
+  boolean schedule(String taskId);
 
   /**
    * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
@@ -183,12 +178,12 @@ interface TaskScheduler extends EventSubscriber {
 
     @Timed("task_schedule_attempt")
     @Override
-    public TaskSchedulerResult schedule(final String taskId) {
+    public boolean schedule(final String taskId) {
       scheduleAttemptsFired.incrementAndGet();
       try {
-        return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
+        return storage.write(new MutateWork.Quiet<Boolean>() {
           @Override
-          public TaskSchedulerResult apply(MutableStoreProvider store) {
+          public Boolean apply(MutableStoreProvider store) {
             LOG.fine("Attempting to schedule task " + taskId);
             final IScheduledTask task = Iterables.getOnlyElement(
                 store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
@@ -202,7 +197,7 @@ interface TaskScheduler extends EventSubscriber {
                 if (!offerQueue.launchFirst(getAssignerFunction(aggregate, taskId, task))) {
                   // Task could not be scheduled.
                   maybePreemptFor(taskId, aggregate);
-                  return TaskSchedulerResult.TRY_AGAIN;
+                  return false;
                 }
               } catch (OfferQueue.LaunchException e) {
                 LOG.log(Level.WARNING, "Failed to launch task.", e);
@@ -217,7 +212,7 @@ interface TaskScheduler extends EventSubscriber {
               }
             }
 
-            return TaskSchedulerResult.SUCCESS;
+            return true;
           }
         });
       } catch (RuntimeException e) {
@@ -225,7 +220,7 @@ interface TaskScheduler extends EventSubscriber {
         // if there is a transient issue resulting in an unchecked exception.
         LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
         scheduleAttemptsFailed.incrementAndGet();
-        return TaskSchedulerResult.TRY_AGAIN;
+        return false;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
new file mode 100644
index 0000000..e23ab5c
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.RateLimiter;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class TaskGroupsTest extends EasyMockTest {
+
+  private ScheduledExecutorService executor;
+  private BackoffStrategy backoffStrategy;
+  private TaskScheduler taskScheduler;
+  private RescheduleCalculator rescheduleCalculator;
+
+  private TaskGroups taskGroups;
+
+  @Before
+  public void setUp() throws Exception {
+    executor = createMock(ScheduledExecutorService.class);
+    backoffStrategy = createMock(BackoffStrategy.class);
+    taskScheduler = createMock(TaskScheduler.class);
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
+    taskGroups = new TaskGroups(
+        executor,
+        backoffStrategy,
+        RateLimiter.create(10000),
+        taskScheduler,
+        rescheduleCalculator);
+  }
+
+  @Test
+  public void testEvaluatedImmediately() {
+    expect(backoffStrategy.calculateBackoffMs(0)).andReturn(0L);
+    executor.schedule(EasyMock.<Runnable>anyObject(), EasyMock.eq(0L), EasyMock.eq(MILLISECONDS));
+    expectLastCall().andAnswer(new IAnswer<ScheduledFuture<Void>>() {
+      @Override
+      public ScheduledFuture<Void> answer() {
+        ((Runnable) EasyMock.getCurrentArguments()[0]).run();
+        return null;
+      }
+    });
+    expect(taskScheduler.schedule("a")).andReturn(true);
+
+    control.replay();
+
+    taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a"), INIT));
+  }
+
+  private Capture<Runnable> expectEvaluate() {
+    Capture<Runnable> capture = createCapture();
+    executor.schedule(EasyMock.capture(capture), EasyMock.eq(0L), EasyMock.eq(MILLISECONDS));
+    expectLastCall().andReturn(null);
+    return capture;
+  }
+
+  @Test
+  public void testTaskDeletedBeforeEvaluating() {
+    final IScheduledTask task = makeTask("a");
+
+    expect(backoffStrategy.calculateBackoffMs(0)).andReturn(0L).atLeastOnce();
+    Capture<Runnable> evaluate = expectEvaluate();
+
+    expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(new IAnswer<Boolean>() {
+      @Override
+      public Boolean answer() {
+        // Test a corner case where a task is deleted while it is being evaluated by the task
+        // scheduler.  If not handled carefully, this could result in the scheduler trying again
+        // later to satisfy the deleted task.
+        taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
+
+        return false;
+      }
+    });
+
+    control.replay();
+
+    taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)), INIT));
+    evaluate.getValue().run();
+  }
+
+  private static IScheduledTask makeTask(String id) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(ScheduleStatus.PENDING)
+        .setAssignedTask(new AssignedTask()
+            .setTaskId(id)
+            .setTask(new TaskConfig()
+                .setOwner(new Identity("owner", "owner"))
+                .setEnvironment("test")
+                .setJobName("job"))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 65e00f7..5bcd7a4 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -56,11 +56,11 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerResult.SUCCESS;
-import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerResult.TRY_AGAIN;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class TaskSchedulerImplTest extends EasyMockTest {
 
@@ -150,14 +150,14 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(TRY_AGAIN, scheduler.schedule("a"));
-    assertEquals(TRY_AGAIN, scheduler.schedule("b"));
+    assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule("b"));
 
     assertEquals(Optional.<TaskInfo>absent(), firstAssignment.getValue().apply(OFFER));
 
     clock.advance(reservationDuration);
 
-    assertEquals(SUCCESS, scheduler.schedule("b"));
+    assertTrue(scheduler.schedule("b"));
 
     assertEquals(true, secondAssignment.getValue().apply(OFFER).isPresent());
   }
@@ -187,12 +187,12 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
     control.replay();
-    assertEquals(TRY_AGAIN, scheduler.schedule("a"));
-    assertEquals(SUCCESS, scheduler.schedule("a"));
+    assertFalse(scheduler.schedule("a"));
+    assertTrue(scheduler.schedule("a"));
     firstAssignment.getValue().apply(OFFER);
     eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
     clock.advance(halfReservationDuration);
-    assertEquals(SUCCESS, scheduler.schedule("b"));
+    assertTrue(scheduler.schedule("b"));
     secondAssignment.getValue().apply(OFFER);
   }
 
@@ -212,9 +212,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectAssigned(TASK_A);
 
     control.replay();
-    assertEquals(TRY_AGAIN, scheduler.schedule("a"));
+    assertFalse(scheduler.schedule("a"));
     clock.advance(halfReservationDuration);
-    assertEquals(SUCCESS, scheduler.schedule("a"));
+    assertTrue(scheduler.schedule("a"));
 
     firstAssignment.getValue().apply(OFFER);
   }
@@ -237,11 +237,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectAssigned(TASK_B);
 
     control.replay();
-    assertEquals(TRY_AGAIN, scheduler.schedule("a"));
+    assertFalse(scheduler.schedule("a"));
     clock.advance(halfReservationDuration);
     // Task is killed by user before it is scheduled
     eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
-    assertEquals(SUCCESS, scheduler.schedule("b"));
+    assertTrue(scheduler.schedule("b"));
     assignment.getValue().apply(OFFER);
   }
 
@@ -262,10 +262,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectAssigned(TASK_A);
 
     control.replay();
-    assertEquals(TRY_AGAIN, scheduler.schedule("b"));
+    assertFalse(scheduler.schedule("b"));
     // We don't act on the reservation made by b because we want to see timeout behaviour.
     clock.advance(reservationDuration);
-    assertEquals(SUCCESS, scheduler.schedule("a"));
+    assertTrue(scheduler.schedule("a"));
     firstAssignment.getValue().apply(OFFER);
   }
 
@@ -298,7 +298,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(SUCCESS, scheduler.schedule(Tasks.id(taskA)));
+    assertTrue(scheduler.schedule(Tasks.id(taskA)));
     assignment.getValue().apply(OFFER);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index ce03abc..bf1391e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -41,7 +41,6 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
-import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -106,7 +105,7 @@ public class TaskSchedulerTest extends EasyMockTest {
   private OfferQueue offerQueue;
   private TaskGroups taskGroups;
   private FakeClock clock;
-  private BackoffStrategy flappingStrategy;
+  private RescheduleCalculator rescheduleCalculator;
   private Preemptor preemptor;
   private AttributeAggregate emptyJob;
   private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES);
@@ -124,7 +123,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     returnDelay = createMock(OfferReturnDelay.class);
     clock = new FakeClock();
     clock.setNowMillis(0);
-    flappingStrategy = createMock(BackoffStrategy.class);
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
     preemptor = createMock(Preemptor.class);
     emptyJob = new AttributeAggregate(
         Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
@@ -134,8 +133,6 @@ public class TaskSchedulerTest extends EasyMockTest {
   private void replayAndCreateScheduler() {
     control.replay();
     offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenance);
-    RateLimiter rateLimiter = RateLimiter.create(1);
-    Amount<Long, Time> flappingThreshold = Amount.of(5L, Time.MINUTES);
     TaskScheduler scheduler = new TaskSchedulerImpl(storage,
         stateManager,
         assigner,
@@ -146,16 +143,9 @@ public class TaskSchedulerTest extends EasyMockTest {
     taskGroups = new TaskGroups(
         executor,
         retryStrategy,
-        rateLimiter,
+        RateLimiter.create(100),
         scheduler,
-        clock,
-        // TODO(wfarner): Use a mock rather than impl here.
-        new RescheduleCalculatorImpl(
-            storage,
-            new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
-                flappingStrategy,
-                flappingThreshold,
-                Amount.of(5, Time.SECONDS))));
+        rescheduleCalculator);
   }
 
   private Capture<Runnable> expectOffer() {
@@ -250,13 +240,15 @@ public class TaskSchedulerTest extends EasyMockTest {
 
   @Test
   public void testLoadFromStorage() {
-    expectTaskGroupBackoff(10);
-
-    replayAndCreateScheduler();
-
     final IScheduledTask a = makeTask("a", KILLED);
     final IScheduledTask b = makeTask("b", PENDING);
     final IScheduledTask c = makeTask("c", RUNNING);
+
+    expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L);
+    expectTaskRetryIn(10);
+
+    replayAndCreateScheduler();
+
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       protected void execute(MutableStoreProvider store) {