You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/14 21:17:14 UTC

git commit: When rescheduling a task, send it to the THROTTLED state if it has been penalized for flapping.

Updated Branches:
  refs/heads/master e1aee67b7 -> a584410c4


When rescheduling a task, send it to the THROTTLED state if it has been
penalized for flapping.

Bugs closed: AURORA-23

Reviewed at https://reviews.apache.org/r/16740/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/a584410c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/a584410c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/a584410c

Branch: refs/heads/master
Commit: a584410c49d615bea6198bed6f0c5ec52c411b0b
Parents: e1aee67
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 14 12:14:54 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 14 12:14:54 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     |  18 +-
 .../aurora/scheduler/async/Preemptor.java       |   2 +-
 .../scheduler/async/RescheduleCalculator.java   |  47 ++---
 .../aurora/scheduler/async/TaskGroup.java       |   1 -
 .../aurora/scheduler/async/TaskGroups.java      |   9 +-
 .../aurora/scheduler/async/TaskThrottler.java   |  87 +++++++++
 .../org/apache/aurora/scheduler/base/Tasks.java |   7 +-
 .../aurora/scheduler/events/PubsubEvent.java    |  41 ----
 .../aurora/scheduler/http/SchedulerzJob.java    |  24 +--
 .../aurora/scheduler/http/SchedulerzRole.java   |   4 +-
 .../scheduler/state/StateManagerImpl.java       |  39 ++--
 .../scheduler/state/TaskStateMachine.java       |  27 ++-
 .../scheduler/storage/StorageBackfill.java      |  19 --
 .../thrift/org/apache/aurora/gen/api.thrift     |   3 +-
 .../async/RescheduleCalculatorImplTest.java     | 189 +++++++++++++++++++
 .../scheduler/async/TaskSchedulerTest.java      | 149 +--------------
 .../scheduler/async/TaskThrottlerTest.java      | 137 ++++++++++++++
 .../state/BaseSchedulerCoreImplTest.java        | 144 +++++++-------
 .../scheduler/state/StateManagerImplTest.java   |  36 +++-
 .../scheduler/state/TaskStateMachineTest.java   |  11 ++
 .../scheduler/storage/StorageBackfillTest.java  |  92 ---------
 21 files changed, 619 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 6be658d..72d3621 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -162,7 +162,7 @@ public class AsyncModule extends AbstractModule {
 
     // AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses
     // a MultiBinder, which cannot span multiple injectors.
-    binder().install(new PrivateModule() {
+    install(new PrivateModule() {
       @Override protected void configure() {
         bind(new TypeLiteral<Amount<Long, Time>>() { })
             .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get());
@@ -175,7 +175,7 @@ public class AsyncModule extends AbstractModule {
     });
     PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
 
-    binder().install(new PrivateModule() {
+    install(new PrivateModule() {
       @Override protected void configure() {
         bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
             new TruncatedBinaryBackoff(INITIAL_SCHEDULE_DELAY.get(), MAX_SCHEDULE_DELAY.get()),
@@ -188,6 +188,7 @@ public class AsyncModule extends AbstractModule {
                 MAX_RESCHEDULING_DELAY.get()));
 
         bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
+        expose(RescheduleCalculator.class);
         if (ENABLE_PREEMPTOR.get()) {
           bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
           bind(PreemptorImpl.class).in(Singleton.class);
@@ -206,7 +207,7 @@ public class AsyncModule extends AbstractModule {
     bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
     PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
 
-    binder().install(new PrivateModule() {
+    install(new PrivateModule() {
       @Override protected void configure() {
         bind(OfferReturnDelay.class).to(RandomJitterReturnDelay.class);
         bind(ScheduledExecutorService.class).toInstance(executor);
@@ -217,7 +218,7 @@ public class AsyncModule extends AbstractModule {
     });
     PubsubEventModule.bindSubscriber(binder(), OfferQueue.class);
 
-    binder().install(new PrivateModule() {
+    install(new PrivateModule() {
       @Override protected void configure() {
         // TODO(ksweeney): Create a configuration validator module so this can be injected.
         // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
@@ -231,6 +232,15 @@ public class AsyncModule extends AbstractModule {
       }
     });
     PubsubEventModule.bindSubscriber(binder(), HistoryPruner.class);
+
+    install(new PrivateModule() {
+      @Override protected void configure() {
+        bind(ScheduledExecutorService.class).toInstance(executor);
+        bind(TaskThrottler.class).in(Singleton.class);
+        expose(TaskThrottler.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), TaskThrottler.class);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index 0afbef9..b190a00 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -121,7 +121,7 @@ public interface Preemptor {
 
     private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
       @Override public boolean apply(IScheduledTask task) {
-        return (clock.nowMillis() - Iterables.getLast(task.getTaskEvents()).getTimestamp())
+        return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
             >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index 0265bf9..a9ee32b 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Lists;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Clock;
 import com.twitter.common.util.Random;
 
 import org.apache.aurora.gen.ScheduleStatus;
@@ -51,32 +50,31 @@ import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 /**
  * Calculates scheduling delays for tasks.
  */
-interface RescheduleCalculator {
+public interface RescheduleCalculator {
   /**
-   * Gets a timestamp for the task to become eligible for (re)scheduling at scheduler startup.
+   * Calculates the delay, in milliseconds, before the task should be considered eligible for
+   * (re)scheduling at scheduler startup.
    *
-   * @param task Task to calculate timestamp for.
-   * @return Timestamp in msec.
+   * @param task Task to calculate delay for.
+   * @return Delay in msec.
    */
-  long getStartupReadyTimeMs(IScheduledTask task);
+  long getStartupScheduleDelayMs(IScheduledTask task);
 
   /**
-   * Gets a timestamp for the task to become eligible for (re)scheduling.
+   * Calculates the penalty, in milliseconds, that a task should be penalized before being
+   * eligible for rescheduling.
    *
-   * @param task Task to calculate timestamp for.
-   * @return Timestamp in msec.
+   * @param task Task to calculate delay for.
+   * @return Delay in msec.
    */
-  long getReadyTimeMs(IScheduledTask task);
+  long getFlappingPenaltyMs(IScheduledTask task);
 
-  // TODO(wfarner): Create a unit test for this class.  It currently piggybacks on
-  // TaskSchedulerTest.  Once a unit test exists, TaskSchedulerTest should use a mock.
   class RescheduleCalculatorImpl implements RescheduleCalculator {
 
     private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
 
     private final Storage storage;
     private final RescheduleCalculatorSettings settings;
-    private final Clock clock;
     // TODO(wfarner): Inject 'random' in the constructor for better test coverage.
     private final Random random = new Random.SystemRandom(new java.util.Random());
 
@@ -138,25 +136,15 @@ interface RescheduleCalculator {
     }
 
     @Inject
-    RescheduleCalculatorImpl(
-        Storage storage,
-        RescheduleCalculatorSettings settings,
-        Clock clock) {
-
+    RescheduleCalculatorImpl(Storage storage, RescheduleCalculatorSettings settings) {
       this.storage = checkNotNull(storage);
       this.settings = checkNotNull(settings);
-      this.clock = checkNotNull(clock);
     }
 
     @Override
-    public long getStartupReadyTimeMs(IScheduledTask task) {
-      return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS))
-          + getTaskReadyTimestamp(task);
-    }
-
-    @Override
-    public long getReadyTimeMs(IScheduledTask task) {
-      return getTaskReadyTimestamp(task);
+    public long getStartupScheduleDelayMs(IScheduledTask task) {
+      return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS).intValue())
+          + getFlappingPenaltyMs(task);
     }
 
     private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
@@ -170,7 +158,8 @@ interface RescheduleCalculator {
       return Optional.fromNullable(Iterables.getOnlyElement(res, null));
     }
 
-    private long getTaskReadyTimestamp(IScheduledTask task) {
+    @Override
+    public long getFlappingPenaltyMs(IScheduledTask task) {
       Optional<IScheduledTask> curTask = getTaskAncestor(task);
       long penaltyMs = 0;
       while (curTask.isPresent() && flapped.apply(curTask.get())) {
@@ -185,7 +174,7 @@ interface RescheduleCalculator {
         curTask = getTaskAncestor(curTask.get());
       }
 
-      return penaltyMs + clock.nowMillis();
+      return penaltyMs;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
index 1a00874..a834f44 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
@@ -115,7 +115,6 @@ class TaskGroup {
     return key.toString();
   }
 
-  // TODO(zmanji): Return Task instances here. Can use them to display flapping penalty on web UI.
   public Set<String> getTaskIds() {
     return ImmutableSet.copyOf(Iterables.transform(tasks, TO_TASK_ID));
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index b50c625..702b0df 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -201,8 +201,9 @@ public class TaskGroups implements EventSubscriber {
     return executor;
   }
 
-  private synchronized void add(IAssignedTask task, long readyTimestamp) {
-    groups.getUnchecked(new GroupKey(task.getTask())).push(task.getTaskId(), readyTimestamp);
+  private synchronized void add(IAssignedTask task, long scheduleDelayMs) {
+    groups.getUnchecked(new GroupKey(task.getTask()))
+        .push(task.getTaskId(), clock.nowMillis() + scheduleDelayMs);
   }
 
   /**
@@ -218,8 +219,8 @@ public class TaskGroups implements EventSubscriber {
     if (stateChange.getNewState() == PENDING) {
       IScheduledTask task = stateChange.getTask();
       long readyAtMs = stateChange.isTransition()
-          ? rescheduleCalculator.getReadyTimeMs(task)
-          : rescheduleCalculator.getStartupReadyTimeMs(task);
+          ? rescheduleCalculator.getFlappingPenaltyMs(task)
+          : rescheduleCalculator.getStartupScheduleDelayMs(task);
       add(task.getAssignedTask(), readyAtMs);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
new file mode 100644
index 0000000..11a01ea
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
@@ -0,0 +1,87 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.stats.SlidingStats;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateManager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+
+/**
+ * A holding area for tasks that have been throttled.  Tasks entering the
+ * {@link org.apache.aurora.gen.ScheduleStatus#THROTTLED} state will be transitioned to
+ * {@link org.apache.aurora.gen.ScheduleStatus#PENDING} after the penalty period (as dictated by
+ * {@link RescheduleCalculator} has expired.
+ */
+class TaskThrottler implements EventSubscriber {
+
+  private final RescheduleCalculator rescheduleCalculator;
+  private final Clock clock;
+  private final ScheduledExecutorService executor;
+  private final StateManager stateManager;
+
+  private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms");
+
+  @Inject
+  TaskThrottler(
+      RescheduleCalculator rescheduleCalculator,
+      Clock clock,
+      ScheduledExecutorService executor,
+      StateManager stateManager) {
+
+    this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
+    this.clock = checkNotNull(clock);
+    this.executor = checkNotNull(executor);
+    this.stateManager = checkNotNull(stateManager);
+  }
+
+  @Subscribe
+  public void taskChangedState(final TaskStateChange stateChange) {
+    if (stateChange.getNewState() == THROTTLED) {
+      long readyAtMs = Tasks.getLatestEvent(stateChange.getTask()).getTimestamp()
+          + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
+      long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
+      throttleStats.accumulate(delayMs);
+      executor.schedule(
+          new Runnable() {
+            @Override public void run() {
+              stateManager.changeState(
+                  Query.taskScoped(stateChange.getTaskId()).byStatus(THROTTLED),
+                  PENDING,
+                  Optional.<String>absent());
+            }
+          },
+          delayMs,
+          TimeUnit.MILLISECONDS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index 569e8c3..06a19d8 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -37,6 +37,7 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 
 /**
  * Utility class providing convenience functions relating to tasks.
@@ -177,10 +178,14 @@ public final class Tasks {
     return task.getAssignedTask().getTask().getJobName();
   }
 
+  public static ITaskEvent getLatestEvent(IScheduledTask task) {
+    return Iterables.getLast(task.getTaskEvents());
+  }
+
   public static final Ordering<IScheduledTask> LATEST_ACTIVITY = Ordering.natural()
       .onResultOf(new Function<IScheduledTask, Long>() {
         @Override public Long apply(IScheduledTask task) {
-          return Iterables.getLast(task.getTaskEvents()).getTimestamp();
+          return getLatestEvent(task).getTimestamp();
         }
       });
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index 59e18ea..2669781 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -222,47 +222,6 @@ public interface PubsubEvent {
     }
   }
 
-  public static class TaskRescheduled implements PubsubEvent {
-    private final String role;
-    private final String job;
-    private final int instance;
-
-    public TaskRescheduled(String role, String job, int instance) {
-      this.role = role;
-      this.job = job;
-      this.instance = instance;
-    }
-
-    public String getRole() {
-      return role;
-    }
-
-    public String getJob() {
-      return job;
-    }
-
-    public int getInstance() {
-      return instance;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof TaskRescheduled)) {
-        return false;
-      }
-
-      TaskRescheduled other = (TaskRescheduled) o;
-      return Objects.equal(role, other.role)
-          && Objects.equal(job, other.job)
-          && Objects.equal(instance, other.instance);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(role, job, instance);
-    }
-  }
-
   public static class DriverRegistered implements PubsubEvent {
     @Override
     public boolean equals(Object o) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
index 07a648f..1e0904f 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
@@ -17,7 +17,6 @@ package org.apache.aurora.scheduler.http;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -65,7 +64,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
-import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.twitter.common.base.MorePreconditions.checkNotBlank;
@@ -103,24 +101,6 @@ public class SchedulerzJob extends JerseyTemplateServlet {
         .put(FAILED, EnumSet.of(LOST, FAILED))
       .build();
 
-  private static final Comparator<IScheduledTask> REVERSE_CHRON_COMPARATOR =
-      new Comparator<IScheduledTask>() {
-        @Override public int compare(IScheduledTask taskA, IScheduledTask taskB) {
-          // Sort in reverse chronological order.
-          Iterable<ITaskEvent> taskAEvents = taskA.getTaskEvents();
-          Iterable<ITaskEvent> taskBEvents = taskB.getTaskEvents();
-
-          boolean taskAHasEvents = taskAEvents != null && !Iterables.isEmpty(taskAEvents);
-          boolean taskBHasEvents = taskBEvents != null && !Iterables.isEmpty(taskBEvents);
-          if (taskAHasEvents && taskBHasEvents) {
-            return Long.signum(Iterables.getLast(taskBEvents).getTimestamp()
-                - Iterables.getLast(taskAEvents).getTimestamp());
-          } else {
-            return 0;
-          }
-        }
-      };
-
   private static final Function<Veto, String> GET_REASON = new Function<Veto, String>() {
     @Override public String apply(Veto veto) {
       return veto.getReason();
@@ -165,7 +145,7 @@ public class SchedulerzJob extends JerseyTemplateServlet {
             .put("instanceId", task.getInstanceId())
             .put("slaveHost", task.isSetSlaveHost() ? task.getSlaveHost() : "")
             .put("status", scheduledTask.getStatus())
-            .put("statusTimestamp", Iterables.getLast(scheduledTask.getTaskEvents()).getTimestamp())
+            .put("statusTimestamp", Tasks.getLatestEvent(scheduledTask).getTimestamp())
             .put("taskEvents", scheduledTask.getTaskEvents());
 
           if (scheduledTask.getStatus() == ScheduleStatus.PENDING) {
@@ -411,7 +391,7 @@ public class SchedulerzJob extends JerseyTemplateServlet {
         if (completedQuery.isPresent()) {
           List<IScheduledTask> completedTasks = Lists.newArrayList(
               Storage.Util.weaklyConsistentFetchTasks(storage, completedQuery.get()));
-          Collections.sort(completedTasks, REVERSE_CHRON_COMPARATOR);
+          Collections.sort(completedTasks, Tasks.LATEST_ACTIVITY.reverse());
           template.setAttribute("completedTasks",
               ImmutableList.copyOf(
                   Iterables.transform(offsetAndLimit(completedTasks, offset), taskToStringMap)));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
index cef0ff2..785efd0 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
@@ -294,9 +294,7 @@ public class SchedulerzRole extends JerseyTemplateServlet {
                 case UNKNOWN:
                   job.failedTaskCount++;
                   Date now = new Date();
-                  long elapsedMillis = now.getTime()
-                      - Iterables.getLast(task.getTaskEvents()).getTimestamp();
-
+                  long elapsedMillis = now.getTime() - Tasks.getLatestEvent(task).getTimestamp();
                   if (Amount.of(elapsedMillis, Time.MILLISECONDS).as(Time.HOURS) < 6) {
                     job.recentlyFailedTaskCount++;
                   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index b6dd537..256f830 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -46,6 +46,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -68,6 +69,7 @@ import static com.twitter.common.base.MorePreconditions.checkNotBlank;
 
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
 import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
 
@@ -82,14 +84,11 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
 public class StateManagerImpl implements StateManager {
   private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
 
-  private final SideEffectStorage storage;
   @VisibleForTesting
   SideEffectStorage getStorage() {
     return storage;
   }
 
-  private final TaskIdGenerator taskIdGenerator;
-
   // Work queue to receive state machine side effect work.
   // Items are sorted to place DELETE entries last.  This is to ensure that within an operation,
   // a delete is always processed after a state transition.
@@ -129,8 +128,11 @@ public class StateManagerImpl implements StateManager {
         }
       };
 
-  private final Driver driver;
+  private final SideEffectStorage storage;
   private final Clock clock;
+  private final Driver driver;
+  private final TaskIdGenerator taskIdGenerator;
+  private final RescheduleCalculator rescheduleCalculator;
 
   /**
    * An item of work on the work queue.
@@ -157,7 +159,8 @@ public class StateManagerImpl implements StateManager {
       final Clock clock,
       Driver driver,
       TaskIdGenerator taskIdGenerator,
-      EventSink eventSink) {
+      EventSink eventSink,
+      RescheduleCalculator rescheduleCalculator) {
 
     checkNotNull(storage);
     this.clock = checkNotNull(clock);
@@ -171,6 +174,7 @@ public class StateManagerImpl implements StateManager {
     this.storage = new SideEffectStorage(storage, finalizer, eventSink);
     this.driver = checkNotNull(driver);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
+    this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
 
     Stats.exportSize("work_queue_depth", workQueue);
   }
@@ -334,8 +338,9 @@ public class StateManagerImpl implements StateManager {
 
         switch (work.command) {
           case RESCHEDULE:
-            ScheduledTask builder =
-                Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
+            IScheduledTask ancestor = Iterables.getOnlyElement(taskStore.fetchTasks(idQuery));
+
+            ScheduledTask builder = ancestor.newBuilder();
             builder.getAssignedTask().unsetSlaveId();
             builder.getAssignedTask().unsetSlaveHost();
             builder.getAssignedTask().unsetAssignedPorts();
@@ -351,13 +356,19 @@ public class StateManagerImpl implements StateManager {
             IScheduledTask task = IScheduledTask.build(builder);
             taskStore.saveTasks(ImmutableSet.of(task));
 
-            createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
-            ITaskConfig taskInfo = task.getAssignedTask().getTask();
-            sideEffectWork.addTaskEvent(
-                new PubsubEvent.TaskRescheduled(
-                    taskInfo.getOwner().getRole(),
-                    taskInfo.getJobName(),
-                    task.getAssignedTask().getInstanceId()));
+            ScheduleStatus newState;
+            String auditMessage;
+            long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(ancestor);
+            if (flapPenaltyMs > 0) {
+              newState = THROTTLED;
+              auditMessage =
+                  String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
+            } else {
+              newState = PENDING;
+              auditMessage = "Rescheduled";
+            }
+
+            createStateMachine(task).updateState(newState, Optional.of(auditMessage));
             break;
 
           case UPDATE_STATE:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index d0f88e5..11d283d 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -77,6 +77,7 @@ class TaskStateMachine {
   private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
   private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
   private static final State STARTING = State.create(ScheduleStatus.STARTING);
+  private static final State THROTTLED = State.create(ScheduleStatus.THROTTLED);
   private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
 
   @VisibleForTesting
@@ -263,29 +264,23 @@ class TaskStateMachine {
       }
     };
 
+    final Closure<Transition<State>> deleteIfKilling =
+        Closures.filter(Transition.to(KILLING), addWorkClosure(WorkCommand.DELETE));
+
     stateMachine = StateMachine.<State>builder(taskId)
         .logTransitions()
         .initialState(State.create(initialState))
         .addState(
             Rule.from(INIT)
-                .to(PENDING, UNKNOWN))
+                .to(PENDING, THROTTLED, UNKNOWN))
         .addState(
             Rule.from(PENDING)
                 .to(ASSIGNED, KILLING)
-                .withCallback(
-                    new Closure<Transition<State>>() {
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
-                          case KILLING:
-                            addWork(WorkCommand.DELETE);
-                            break;
-
-                          default:
-                            // No-op.
-                        }
-                      }
-                    }
-                ))
+                .withCallback(deleteIfKilling))
+        .addState(
+            Rule.from(THROTTLED)
+            .to(PENDING, KILLING)
+            .withCallback(deleteIfKilling))
         .addState(
             Rule.from(ASSIGNED)
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
@@ -552,7 +547,7 @@ class TaskStateMachine {
    * @param mutation Mutate operation to perform while updating the task.
    * @return {@code true} if the state change was allowed, {@code false} otherwise.
    */
-  public synchronized boolean updateState(
+  private synchronized boolean updateState(
       final ScheduleStatus status,
       Function<IScheduledTask, IScheduledTask> mutation,
       final Optional<String> auditMessage) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
index 69374ca..3ce5bd3 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
@@ -101,24 +101,6 @@ public final class StorageBackfill {
     }
   }
 
-  private static final AtomicLong BOTH_FIELDS_SET = Stats.exportLong("both_instance_ids_set");
-  private static final AtomicLong OLD_FIELD_SET = Stats.exportLong("old_instance_id_set");
-  private static final AtomicLong NEW_FIELD_SET = Stats.exportLong("new_instance_id_set");
-  private static final AtomicLong FIELDS_INCONSISTENT =
-      Stats.exportLong("instance_ids_inconsistent");
-
-  /**
-   * Ensures backwards-compatibility of the throttled state, which exists in this version but is
-   * not handled.
-   *
-   * @param task Task to possibly rewrite.
-   */
-  private static void rewriteThrottledState(ScheduledTask task) {
-    if (ScheduleStatus.THROTTLED == task.getStatus()) {
-      task.setStatus(ScheduleStatus.PENDING);
-    }
-  }
-
   /**
    * Backfills the storage to make it match any assumptions that may have changed since
    * the structs were first written.
@@ -137,7 +119,6 @@ public final class StorageBackfill {
         // TODO(ksweeney): Guarantee tasks pass current validation code here and quarantine if they
         // don't.
         guaranteeShardUniqueness(builder, storeProvider.getUnsafeTaskStore(), clock);
-        rewriteThrottledState(builder);
         return IScheduledTask.build(builder);
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index 33c70df..927552a 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -256,7 +256,8 @@ enum ScheduleStatus {
 }
 
 // States that a task may be in while still considered active.
-const set<ScheduleStatus> ACTIVE_STATES = [ScheduleStatus.PENDING,
+const set<ScheduleStatus> ACTIVE_STATES = [ScheduleStatus.THROTTLED,
+                                           ScheduleStatus.PENDING,
                                            ScheduleStatus.ASSIGNED,
                                            ScheduleStatus.STARTING,
                                            ScheduleStatus.RUNNING,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
new file mode 100644
index 0000000..c450276
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2014 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class RescheduleCalculatorImplTest extends EasyMockTest {
+
+  private static final Amount<Long, Time> FLAPPING_THRESHOLD = Amount.of(1L, Time.MINUTES);
+  private static final Amount<Integer, Time> MAX_STARTUP_DELAY = Amount.of(10, Time.MINUTES);
+
+  private StorageTestUtil storageUtil;
+  private BackoffStrategy backoff;
+  private RescheduleCalculator rescheduleCalculator;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    backoff = createMock(BackoffStrategy.class);
+    rescheduleCalculator = new RescheduleCalculatorImpl(
+        storageUtil.storage,
+        new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+            backoff,
+            FLAPPING_THRESHOLD,
+            MAX_STARTUP_DELAY));
+    storageUtil.expectOperations();
+  }
+
+  @Test
+  public void testNoPenaltyForNoAncestor() {
+    control.replay();
+
+    assertEquals(0L, rescheduleCalculator.getFlappingPenaltyMs(makeTask("a", INIT)));
+  }
+
+  @Test
+  public void testNoPenaltyDeletedAncestor() {
+    String ancestorId = "a";
+    storageUtil.expectTaskFetch(Query.taskScoped(ancestorId));
+
+    control.replay();
+
+    assertEquals(
+        0L,
+        rescheduleCalculator.getFlappingPenaltyMs(setAncestor(makeTask("b", INIT), ancestorId)));
+  }
+
+  @Test
+  public void testFlappingTask() {
+    IScheduledTask ancestor = makeFlappyTask("a");
+    storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+    long penaltyMs = 1000L;
+    expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs);
+
+    control.replay();
+
+    assertEquals(
+        penaltyMs,
+        rescheduleCalculator.getFlappingPenaltyMs(
+            setAncestor(makeTask("b", INIT), Tasks.id(ancestor))));
+  }
+
+  @Test
+  public void testFlappingTasksBackoffTruncation() {
+    // Ensures that the reschedule calculator detects penalty truncation and avoids inspecting
+    // ancestors once truncated.
+    IScheduledTask taskA = setAncestor(makeFlappyTask("a"), "bugIfQueried");
+    IScheduledTask taskB = setAncestor(makeFlappyTask("b"), Tasks.id(taskA));
+    IScheduledTask taskC = setAncestor(makeFlappyTask("c"), Tasks.id(taskB));
+    IScheduledTask taskD = setAncestor(makeFlappyTask("d"), Tasks.id(taskC));
+
+    Map<IScheduledTask, Long> ancestorsAndPenalties = ImmutableMap.of(
+        taskD, 100L,
+        taskC, 200L,
+        taskB, 300L,
+        taskA, 300L);
+
+    long lastPenalty = 0L;
+    for (Map.Entry<IScheduledTask, Long> taskAndPenalty : ancestorsAndPenalties.entrySet()) {
+      storageUtil.expectTaskFetch(
+          Query.taskScoped(Tasks.id(taskAndPenalty.getKey())),
+          taskAndPenalty.getKey());
+      expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue());
+      lastPenalty = taskAndPenalty.getValue();
+    }
+
+    control.replay();
+
+    IScheduledTask newTask = setAncestor(makeFlappyTask("newTask"), Tasks.id(taskD));
+    assertEquals(300L, rescheduleCalculator.getFlappingPenaltyMs(newTask));
+  }
+
+  @Test
+  public void testNoPenaltyForInterruptedTasks() {
+    IScheduledTask ancestor = setEvents(
+        makeTask("a", KILLED),
+        ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L));
+    storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+
+    control.replay();
+
+    assertEquals(
+        0L,
+        rescheduleCalculator.getFlappingPenaltyMs(
+            setAncestor(makeTask("b", INIT), Tasks.id(ancestor))));
+  }
+
+  private IScheduledTask makeFlappyTask(String taskId) {
+    return setEvents(
+        makeTask(taskId, FINISHED),
+        ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, FINISHED, 300L));
+  }
+
+  private IScheduledTask makeTask(String taskId) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setInstanceId(0)
+            .setTaskId(taskId)
+            .setTask(new TaskConfig()
+                .setJobName("job-" + taskId)
+                .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId))
+                .setEnvironment("env-" + taskId))));
+  }
+
+  private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
+    return IScheduledTask.build(makeTask(taskId).newBuilder().setStatus(status));
+  }
+
+  private IScheduledTask setAncestor(IScheduledTask task, String ancestorId) {
+    return IScheduledTask.build(task.newBuilder().setAncestorId(ancestorId));
+  }
+
+  private static final Function<Map.Entry<ScheduleStatus, Long>, TaskEvent> TO_EVENT =
+      new Function<Entry<ScheduleStatus, Long>, TaskEvent>() {
+        @Override public TaskEvent apply(Entry<ScheduleStatus, Long> input) {
+          return new TaskEvent().setStatus(input.getKey()).setTimestamp(input.getValue());
+        }
+      };
+
+  private IScheduledTask setEvents(IScheduledTask task, Map<ScheduleStatus, Long> events) {
+    return IScheduledTask.build(task.newBuilder().setTaskEvents(
+        FluentIterable.from(events.entrySet()).transform(TO_EVENT).toList()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 9698f28..4dfac03 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -15,13 +15,10 @@
  */
 package org.apache.aurora.scheduler.async;
 
-import java.util.EnumSet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.Nullable;
-
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -40,7 +37,6 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
@@ -71,14 +67,11 @@ import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
@@ -148,13 +141,13 @@ public class TaskSchedulerTest extends EasyMockTest {
         rateLimiter,
         scheduler,
         clock,
+        // TODO(wfarner): Use a mock rather than impl here.
         new RescheduleCalculatorImpl(
             storage,
             new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
                 flappingStrategy,
                 flappingThreshold,
-                Amount.of(5, Time.SECONDS)),
-            clock));
+                Amount.of(5, Time.SECONDS))));
   }
 
   private Capture<Runnable> expectOffer() {
@@ -590,144 +583,6 @@ public class TaskSchedulerTest extends EasyMockTest {
     timeoutCapture.getValue().run();
   }
 
-  @Test
-  public void testNoPenaltyForNoAncestor() {
-    // If a task doesn't have an ancestor there should be no penality for flapping.
-    expectAnyMaintenanceCalls();
-    IScheduledTask task = makeTask("a1", INIT);
-
-    expectOfferDeclineIn(10);
-    Capture<Runnable> first = expectTaskGroupBackoff(1);
-    expectTaskScheduled(task);
-
-    replayAndCreateScheduler();
-    offerQueue.addOffer(OFFER_A);
-
-    changeState(task, INIT, PENDING);
-
-    first.getValue().run();
-  }
-
-  @Test
-  public void testFlappingTasksBackoffTruncation() {
-    expectAnyMaintenanceCalls();
-
-    makeFlappyTask("a0", null);
-    makeFlappyTask("a1", "a0");
-    makeFlappyTask("a2", "a1");
-    IScheduledTask taskA3 = IScheduledTask.build(makeTask("a3", INIT).newBuilder()
-        .setAncestorId("a2"));
-
-    expectOfferDeclineIn(10);
-
-    Capture<Runnable> first = expectTaskGroupBackoff(10);
-    // The ancestry chain is 3 long, but if the backoff strategy truncates, we don't traverse the
-    // entire history.
-    expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
-    expect(flappingStrategy.calculateBackoffMs(5L)).andReturn(5L);
-    Capture<Runnable> flapping = expectTaskRetryIn(10);
-
-    expectTaskScheduled(taskA3);
-
-    replayAndCreateScheduler();
-    offerQueue.addOffer(OFFER_A);
-
-    changeState(taskA3, INIT, PENDING);
-
-    first.getValue().run();
-    clock.waitFor(10);
-    flapping.getValue().run();
-  }
-
-  @Test
-  public void testFlappingTasks() {
-    expectAnyMaintenanceCalls();
-
-    makeFlappyTask("a0", null);
-    IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
-        .setAncestorId("a0"));
-
-    expectOfferDeclineIn(10);
-    Capture<Runnable> first = expectTaskGroupBackoff(10);
-
-    expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
-    // Since A1 has been penalized, the task has to wait for another 10 ms until the penalty has
-    // expired.
-    Capture<Runnable> flapping = expectTaskRetryIn(10);
-
-    expectTaskScheduled(taskA1);
-
-    replayAndCreateScheduler();
-
-    offerQueue.addOffer(OFFER_A);
-
-    changeState(taskA1, INIT, PENDING);
-
-    first.getValue().run();
-    clock.waitFor(10);
-    flapping.getValue().run();
-  }
-
-  @Test
-  public void testNoPenaltyForInterruptedTasks() {
-    expectAnyMaintenanceCalls();
-
-    makeFlappyTaskWithStates("a0", EnumSet.of(INIT, PENDING, ASSIGNED, RESTARTING, FAILED), null);
-    IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
-        .setAncestorId("a0"));
-
-    expectOfferDeclineIn(10);
-    Capture<Runnable> first = expectTaskGroupBackoff(10);
-
-    expectTaskScheduled(taskA1);
-
-    replayAndCreateScheduler();
-
-    offerQueue.addOffer(OFFER_A);
-
-    changeState(taskA1, INIT, PENDING);
-
-    first.getValue().run();
-  }
-
-  private IScheduledTask makeFlappyTaskWithStates(
-      String taskId,
-      Iterable<ScheduleStatus> states,
-      @Nullable String ancestorId) {
-
-    Amount<Long, Time> timeInState = Amount.of(10L, Time.SECONDS);
-
-    ScheduledTask base = makeTask(taskId, INIT).newBuilder();
-
-    for (ScheduleStatus status : states) {
-      base.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
-      clock.advance(timeInState);
-    }
-
-    base.setAncestorId(ancestorId);
-
-    final IScheduledTask result = IScheduledTask.build(base);
-
-    // Insert the task if it doesn't already exist.
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        if (taskStore.fetchTasks(Query.taskScoped(Tasks.id(result))).isEmpty()) {
-          taskStore.saveTasks(ImmutableSet.of(result));
-        }
-      }
-    });
-
-    return result;
-  }
-
-  private IScheduledTask makeFlappyTask(String taskId, @Nullable String ancestorId) {
-    return makeFlappyTaskWithStates(
-        taskId,
-        EnumSet.of(INIT, PENDING, ASSIGNED, RUNNING, FAILED),
-        ancestorId);
-  }
-
   private TaskInfo makeTaskInfo(IScheduledTask task) {
     return TaskInfo.newBuilder()
         .setName(Tasks.id(task))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
new file mode 100644
index 0000000..66bc2a0
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+
+public class TaskThrottlerTest extends EasyMockTest {
+
+  private RescheduleCalculator rescheduleCalculator;
+  private FakeClock clock;
+  private ScheduledExecutorService executor;
+  private StateManager stateManager;
+  private TaskThrottler throttler;
+
+  @Before
+  public void setUp() throws Exception {
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
+    clock = new FakeClock();
+    executor = createMock(ScheduledExecutorService.class);
+    stateManager = createMock(StateManager.class);
+    throttler = new TaskThrottler(rescheduleCalculator, clock, executor, stateManager);
+  }
+
+  @Test
+  public void testIgnoresNonThrottledTasks() {
+    control.replay();
+
+    throttler.taskChangedState(TaskStateChange.transition(makeTask("a", PENDING), INIT));
+    throttler.taskChangedState(TaskStateChange.transition(makeTask("a", RUNNING), PENDING));
+  }
+
+  @Test
+  public void testThrottledTask() {
+    IScheduledTask task = makeTask("a", THROTTLED);
+
+    long penaltyMs = 100;
+
+    expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs);
+    Capture<Runnable> stateChangeCapture = expectThrottled(penaltyMs);
+    expectMovedToPending(task);
+
+    control.replay();
+
+    throttler.taskChangedState(TaskStateChange.transition(task, INIT));
+    stateChangeCapture.getValue().run();
+  }
+
+  @Test
+  public void testThrottledTaskReady() {
+    // Ensures that a sane delay is used when the task's penalty was already expired when
+    // the -> THROTTLED transition occurred (such as in the event of a scheduler failover).
+
+    IScheduledTask task = makeTask("a", THROTTLED);
+
+    long penaltyMs = 100;
+
+    expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs);
+    Capture<Runnable> stateChangeCapture = expectThrottled(0);
+    expectMovedToPending(task);
+
+    control.replay();
+
+    clock.advance(Amount.of(1L, Time.HOURS));
+    throttler.taskChangedState(TaskStateChange.transition(task, INIT));
+    stateChangeCapture.getValue().run();
+  }
+
+  private Capture<Runnable> expectThrottled(long penaltyMs) {
+    Capture<Runnable> stateChangeCapture = createCapture();
+    expect(executor.schedule(
+        capture(stateChangeCapture),
+        eq(penaltyMs),
+        eq(TimeUnit.MILLISECONDS)))
+        .andReturn(null);
+    return stateChangeCapture;
+  }
+
+  private void expectMovedToPending(IScheduledTask task) {
+    expect(stateManager.changeState(
+        Query.taskScoped(Tasks.id(task)).byStatus(THROTTLED),
+        PENDING,
+        Optional.<String>absent()))
+        .andReturn(1);
+  }
+
+  private IScheduledTask makeTask(String id, ScheduleStatus status) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setTaskEvents(ImmutableList.of(
+            new TaskEvent()
+                .setStatus(status)
+                .setTimestamp(clock.nowMillis())))
+        .setStatus(status)
+        .setAssignedTask(new AssignedTask().setTaskId(id)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 720d0c8..cc929f8 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -59,6 +59,7 @@ import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
@@ -75,7 +76,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.StorageBackfill;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IIdentity;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -83,6 +83,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -137,6 +138,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private CronJobManager cron;
   private FakeClock clock;
   private EventSink eventSink;
+  private RescheduleCalculator rescheduleCalculator;
   private ShutdownRegistry shutdownRegistry;
   private JobFilter jobFilter;
 
@@ -154,6 +156,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     clock = new FakeClock();
     eventSink = createMock(EventSink.class);
     eventSink.post(EasyMock.<PubsubEvent>anyObject());
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
     cronScheduler = createMock(CronScheduler.class);
     shutdownRegistry = createMock(ShutdownRegistry.class);
     jobFilter = createMock(JobFilter.class);
@@ -184,12 +187,19 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private void buildScheduler(Storage newStorage) throws Exception {
     this.storage = newStorage;
     storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
         StorageBackfill.backfill(storeProvider, clock);
       }
     });
 
-    stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
+    stateManager = new StateManagerImpl(
+        storage,
+        clock,
+        driver,
+        taskIdGenerator,
+        eventSink,
+        rescheduleCalculator);
     ImmediateJobManager immediateManager = new ImmediateJobManager(stateManager, storage);
     cron = new CronJobManager(stateManager, storage, cronScheduler, shutdownRegistry);
     scheduler = new SchedulerCoreImpl(
@@ -412,16 +422,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     }
   }
 
-  @Test
-  public void testSortableTaskIds() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    for (IScheduledTask task : getTasks(Query.unscoped())) {
-      assertEquals(IIdentity.build(OWNER_A), task.getAssignedTask().getTask().getOwner());
-    }
-  }
-
   @Test(expected = ScheduleException.class)
   public void testCreateDuplicateJob() throws Exception {
     control.replay();
@@ -577,7 +577,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     assertTaskCount(1);
     assertEquals(PENDING, getTask(taskId).getStatus());
 
-    changeStatus(Query.taskScoped(taskId), ASSIGNED);
+    changeStatus(taskId, ASSIGNED);
 
     scheduler.startCronJob(KEY_A);
     assertTaskCount(2);
@@ -649,20 +649,28 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     }
   }
 
+  private IExpectationSetters<Long> expectTaskNotThrottled() {
+    return expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+        .andReturn(0L);
+  }
+
   @Test
   public void testServiceTasksRescheduled() throws Exception {
+    int numServiceTasks = 5;
+
+    expectTaskNotThrottled().times(numServiceTasks);
+
     control.replay();
     buildScheduler();
 
     // Schedule 5 service and 5 non-service tasks.
-    scheduler.createJob(makeJob(KEY_A, 5));
+    scheduler.createJob(makeJob(KEY_A, numServiceTasks));
     TaskConfig task = productionTask().setIsService(true);
     scheduler.createJob(
         makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
 
     assertEquals(10, getTasksByStatus(PENDING).size());
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
-    changeStatus(Query.roleScoped(ROLE_A), STARTING);
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
     assertEquals(10, getTasksByStatus(STARTING).size());
 
     changeStatus(Query.roleScoped(ROLE_A), RUNNING);
@@ -683,11 +691,14 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testServiceTaskIgnoresMaxFailures() throws Exception {
+    int totalFailures = 10;
+
+    expectTaskNotThrottled().times(totalFailures);
+
     control.replay();
     buildScheduler();
 
     int maxFailures = 5;
-    int totalFailures = 10;
 
     // Schedule a service task.
     TaskConfig task = productionTask()
@@ -701,9 +712,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
       String taskId = Tasks.id(
           getOnlyTask(Query.jobScoped(KEY_A).active()));
 
-      changeStatus(taskId, ASSIGNED);
-      changeStatus(taskId, STARTING);
-      changeStatus(taskId, RUNNING);
+      changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
       assertEquals(i - 1, getTask(taskId).getFailureCount());
       changeStatus(taskId, FAILED);
 
@@ -718,27 +727,26 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testTaskRescheduleOnKill() throws Exception {
+    int numServiceTasks = 5;
+
+    expectTaskNotThrottled().times(numServiceTasks);
+
     control.replay();
     buildScheduler();
 
-    // Create 5 non-service and 5 service tasks.
-    scheduler.createJob(makeJob(KEY_A, 5));
-    TaskConfig task = productionTask().setIsService(true);
-    scheduler.createJob(
-        makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
+    scheduler.createJob(makeJob(KEY_A, numServiceTasks));
 
-    assertEquals(10, getTasksByStatus(PENDING).size());
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
-    changeStatus(Query.roleScoped(ROLE_A), STARTING);
-    assertEquals(10, getTasksByStatus(STARTING).size());
+    assertEquals(5, getTasksByStatus(PENDING).size());
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
+    assertEquals(5, getTasksByStatus(STARTING).size());
     changeStatus(Query.roleScoped(ROLE_A), RUNNING);
-    assertEquals(10, getTasksByStatus(RUNNING).size());
+    assertEquals(5, getTasksByStatus(RUNNING).size());
 
     // All tasks will move back into PENDING state after getting KILLED.
     changeStatus(Query.roleScoped(ROLE_A), KILLED);
     Set<IScheduledTask> newTasks = getTasksByStatus(PENDING);
-    assertEquals(10, newTasks.size());
-    assertEquals(10, getTasksByStatus(KILLED).size());
+    assertEquals(5, newTasks.size());
+    assertEquals(5, getTasksByStatus(KILLED).size());
   }
 
   @Test
@@ -749,9 +757,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     buildScheduler();
 
     scheduler.createJob(makeJob(KEY_A, 1));
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
-    changeStatus(Query.roleScoped(ROLE_A), STARTING);
-    changeStatus(Query.roleScoped(ROLE_A), RUNNING);
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING, RUNNING);
     scheduler.killTasks(Query.roleScoped(ROLE_A), OWNER_A.getUser());
     changeStatus(Query.roleScoped(ROLE_A), KILLED);
 
@@ -765,6 +771,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testFailedTaskIncrementsFailureCount() throws Exception {
     int maxFailures = 5;
+    expectTaskNotThrottled().times(maxFailures - 1);
+
     control.replay();
     buildScheduler();
 
@@ -778,9 +786,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
       String taskId = Tasks.id(getOnlyTask(
           Query.jobScoped(KEY_A).active()));
 
-      changeStatus(taskId, ASSIGNED);
-      changeStatus(taskId, STARTING);
-      changeStatus(taskId, RUNNING);
+      changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
       assertEquals(i - 1, getTask(taskId).getFailureCount());
       changeStatus(taskId, FAILED);
 
@@ -819,8 +825,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
     assertTaskCount(10);
 
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
-    changeStatus(Query.roleScoped(ROLE_A), STARTING);
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
     assertTaskCount(10);
     changeStatus(Query.roleScoped(ROLE_A), RUNNING);
     assertTaskCount(10);
@@ -895,9 +900,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
     scheduler.createJob(makeJob(KEY_A, 1));
     String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
-    changeStatus(taskId, ASSIGNED);
-    changeStatus(taskId, STARTING);
-    changeStatus(taskId, RUNNING);
+    changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
     scheduler.killTasks(Query.taskScoped(taskId), OWNER_A.getUser());
     assertEquals(KILLING, getTask(taskId).getStatus());
     assertEquals(1, getTasks(Query.roleScoped(ROLE_A)).size());
@@ -925,29 +928,25 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testLostTaskRescheduled() throws Exception {
     expectKillTask(2);
+    expectTaskNotThrottled().times(2);
 
     control.replay();
     buildScheduler();
 
-    int maxFailures = 5;
-    TaskConfig task = productionTask().setMaxTaskFailures(maxFailures);
-    scheduler.createJob(makeJob(KEY_A, task, 1));
+    scheduler.createJob(makeJob(KEY_A, 1));
     assertTaskCount(1);
 
     Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.jobScoped(KEY_A));
+    String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
     assertEquals(1, tasks.size());
 
-    changeStatus(Query.unscoped().byStatus(PENDING), ASSIGNED);
+    changeStatus(taskId, ASSIGNED, LOST);
 
-    Query.Builder pendingQuery = Query.unscoped().byStatus(PENDING);
-    changeStatus(Query.unscoped().byStatus(ASSIGNED), LOST);
-    assertEquals(PENDING, getOnlyTask(pendingQuery).getStatus());
-    assertTaskCount(2);
+    String newTaskId = Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)));
+    assertFalse(newTaskId.equals(taskId));
 
-    changeStatus(Query.unscoped().byStatus(PENDING), ASSIGNED);
-    changeStatus(Query.unscoped().byStatus(ASSIGNED), LOST);
-    assertEquals(PENDING, getOnlyTask(pendingQuery).getStatus());
-    assertTaskCount(3);
+    changeStatus(newTaskId, ASSIGNED, LOST);
+    assertFalse(newTaskId.equals(Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)))));
   }
 
   @Test
@@ -1017,6 +1016,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testSlaveDeletesTasks() throws Exception {
+    expectTaskNotThrottled();
+
     control.replay();
     buildScheduler();
 
@@ -1029,10 +1030,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     assignTask(taskId1, SLAVE_ID, SLAVE_HOST_1);
     assignTask(taskId2, SLAVE_ID, SLAVE_HOST_1);
 
-    changeStatus(taskId1, STARTING);
-    changeStatus(taskId1, RUNNING);
-    changeStatus(taskId2, STARTING);
-    changeStatus(taskId2, FINISHED);
+    changeStatus(taskId1, STARTING, RUNNING);
+    changeStatus(taskId2, STARTING, FINISHED);
 
     scheduler.tasksDeleted(ImmutableSet.of(taskId1, taskId2));
 
@@ -1049,13 +1048,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testRestartShards() throws Exception {
     expectKillTask(2);
+    expectTaskNotThrottled().times(2);
 
     control.replay();
     buildScheduler();
 
     scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 6));
-    changeStatus(Query.jobScoped(KEY_A), ASSIGNED);
-    changeStatus(Query.jobScoped(KEY_A), RUNNING);
+    changeStatus(Query.jobScoped(KEY_A), ASSIGNED, RUNNING);
     scheduler.restartShards(KEY_A, ImmutableSet.of(1, 5), OWNER_A.user);
     assertEquals(4, getTasks(Query.unscoped().byStatus(RUNNING)).size());
     assertEquals(2, getTasks(Query.unscoped().byStatus(RESTARTING)).size());
@@ -1065,12 +1064,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test(expected = ScheduleException.class)
   public void testRestartNonexistentShard() throws Exception {
+    expectTaskNotThrottled();
+
     control.replay();
     buildScheduler();
 
     scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 1));
-    changeStatus(Query.jobScoped(KEY_A), ASSIGNED);
-    changeStatus(Query.jobScoped(KEY_A), FINISHED);
+    changeStatus(Query.jobScoped(KEY_A), ASSIGNED, FINISHED);
     scheduler.restartShards(KEY_A, ImmutableSet.of(5), OWNER_A.user);
   }
 
@@ -1107,6 +1107,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testPortResourceResetAfterReschedule() throws Exception {
     expectKillTask(1);
+    expectTaskNotThrottled();
 
     control.replay();
     buildScheduler();
@@ -1139,8 +1140,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.createJob(makeJob(KEY_A, 1));
 
     String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
-    changeStatus(taskId, ASSIGNED);
-    changeStatus(taskId, STARTING);
+    changeStatus(taskId, ASSIGNED, STARTING);
     changeStatus(taskId, FAILED, Optional.of("bad stuff happened"));
 
     String hostname = getLocalHost();
@@ -1456,12 +1456,16 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.setTaskStatus(query, status, message);
   }
 
-  public void changeStatus(Query.Builder query, ScheduleStatus status) {
-    changeStatus(query, status, Optional.<String>absent());
+  public void changeStatus(Query.Builder query, ScheduleStatus status, ScheduleStatus... statuses) {
+    for (ScheduleStatus nextStatus
+        : ImmutableList.<ScheduleStatus>builder().add(status).add(statuses).build()) {
+
+      changeStatus(query, nextStatus, Optional.<String>absent());
+    }
   }
 
-  public void changeStatus(String taskId, ScheduleStatus status) {
-    changeStatus(taskId, status, Optional.<String>absent());
+  public void changeStatus(String taskId, ScheduleStatus status, ScheduleStatus... statuses) {
+    changeStatus(Query.taskScoped(taskId), status, statuses);
   }
 
   public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index b17b983..74ec74f 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -36,6 +36,7 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -55,10 +56,12 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
 import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
 import static org.easymock.EasyMock.expect;
@@ -75,6 +78,7 @@ public class StateManagerImplTest extends EasyMockTest {
   private Driver driver;
   private TaskIdGenerator taskIdGenerator;
   private EventSink eventSink;
+  private RescheduleCalculator rescheduleCalculator;
   private StateManagerImpl stateManager;
   private final FakeClock clock = new FakeClock();
   private Storage storage;
@@ -84,9 +88,16 @@ public class StateManagerImplTest extends EasyMockTest {
     taskIdGenerator = createMock(TaskIdGenerator.class);
     driver = createMock(Driver.class);
     eventSink = createMock(EventSink.class);
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
     // TODO(William Farner): Use a mocked storage.
     storage = MemStorage.newEmptyStorage();
-    stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
+    stateManager = new StateManagerImpl(
+        storage,
+        clock,
+        driver,
+        taskIdGenerator,
+        eventSink,
+        rescheduleCalculator);
   }
 
   @After
@@ -224,7 +235,8 @@ public class StateManagerImplTest extends EasyMockTest {
     // Trigger an event that produces a side-effect and a PubSub event .
     eventSink.post(matchStateChange(id, INIT, PENDING));
     expectLastCall().andAnswer(new IAnswer<Void>() {
-      @Override public Void answer() throws Throwable {
+      @Override
+      public Void answer() throws Throwable {
         stateManager.changeState(
             Query.unscoped(), ScheduleStatus.ASSIGNED, Optional.<String>absent());
         return null;
@@ -253,6 +265,26 @@ public class StateManagerImplTest extends EasyMockTest {
     stateManager.deleteTasks(ImmutableSet.of(taskId));
   }
 
+  @Test
+  public void testThrottleTask() {
+    ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+    String newTaskId = "b";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(newTaskId);
+    expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+        .andReturn(100L);
+    expectStateTransitions(newTaskId, INIT, THROTTLED);
+
+    control.replay();
+
+    insertTask(task, 0);
+    changeState(taskId, ASSIGNED);
+    changeState(taskId, RUNNING);
+    changeState(taskId, FAILED);
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
index e89e60a..f44ee58 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
@@ -46,12 +46,14 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
 import static org.apache.aurora.scheduler.state.WorkCommand.DELETE;
 import static org.apache.aurora.scheduler.state.WorkCommand.INCREMENT_FAILURES;
 import static org.apache.aurora.scheduler.state.WorkCommand.KILL;
 import static org.apache.aurora.scheduler.state.WorkCommand.RESCHEDULE;
 import static org.apache.aurora.scheduler.state.WorkCommand.UPDATE_STATE;
+
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.hamcrest.CoreMatchers.is;
@@ -315,6 +317,15 @@ public class TaskStateMachineTest extends EasyMockTest {
     transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, KILLING, KILLED);
   }
 
+  @Test
+  public void testThrottledTask() {
+    expectWork(UPDATE_STATE).times(2);
+
+    control.replay();
+
+    transition(stateMachine, THROTTLED, PENDING);
+  }
+
   private static void transition(TaskStateMachine stateMachine, ScheduleStatus... states) {
     for (ScheduleStatus status : states) {
       stateMachine.updateState(status);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
deleted file mode 100644
index 724188b..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Constraint;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.LimitConstraint;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskConstraint;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class StorageBackfillTest {
-
-  private Storage storage;
-  private FakeClock clock;
-
-  @Before
-  public void setUp() {
-    storage = MemStorage.newEmptyStorage();
-    clock = new FakeClock();
-  }
-
-  private static IScheduledTask makeTask(String id, int instanceId) {
-
-    TaskConfig config = new TaskConfig()
-        .setOwner(new Identity("user", "role"))
-        .setEnvironment("test")
-        .setJobName("jobName")
-        .setProduction(false)
-        .setConstraints(ImmutableSet.of(
-            new Constraint("host", TaskConstraint.limit(new LimitConstraint(1)))))
-        .setRequestedPorts(ImmutableSet.<String>of())
-        .setMaxTaskFailures(1)
-        .setTaskLinks(ImmutableMap.<String, String>of());
-    ScheduledTask task = new ScheduledTask().setAssignedTask(
-        new AssignedTask().setTask(config));
-    task.getAssignedTask().setTaskId(id);
-    task.getAssignedTask().setInstanceId(instanceId);
-    return IScheduledTask.build(task);
-  }
-
-  @Test
-  public void testRewriteThrottledState() {
-    final IScheduledTask savedTask =
-        IScheduledTask.build(makeTask("id", 0).newBuilder().setStatus(ScheduleStatus.THROTTLED));
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(savedTask));
-        StorageBackfill.backfill(storeProvider, clock);
-      }
-    });
-
-    assertEquals(
-        IScheduledTask.build(savedTask.newBuilder().setStatus(ScheduleStatus.PENDING)),
-        getTask("id"));
-  }
-
-  private IScheduledTask getTask(final String id) {
-    return Iterables.getOnlyElement(
-        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(id)));
-  }
-}