You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2013/12/05 20:58:17 UTC

[5/9] git commit: Adding random jitter on pending task rescheduling after startup

Adding random jitter on pending task rescheduling after startup


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/aa9f826f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/aa9f826f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/aa9f826f

Branch: refs/heads/master
Commit: aa9f826f297e1aa7f92abb8c3972d3d6c0d0e5ea
Parents: ecda2b3
Author: Maxim Khutornenko <mk...@twitter.com>
Authored: Thu Dec 5 08:09:41 2013 -0800
Committer: Maxim Khutornenko <mk...@twitter.com>
Committed: Thu Dec 5 08:09:41 2013 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     |  23 ++-
 .../scheduler/async/RescheduleCalculator.java   | 188 +++++++++++++++++++
 .../aurora/scheduler/async/TaskGroups.java      | 162 +++-------------
 .../scheduler/async/TaskSchedulerTest.java      |  11 +-
 4 files changed, 239 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aa9f826f/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
index 37913a8..db07841 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
@@ -30,7 +30,9 @@ import com.google.inject.TypeLiteral;
 
 import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
 import com.twitter.aurora.scheduler.async.TaskGroups.SchedulingAction;
+import com.twitter.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
 import com.twitter.aurora.scheduler.events.PubsubEventModule;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
@@ -45,8 +47,6 @@ import com.twitter.common.util.TruncatedBinaryBackoff;
 import static com.twitter.aurora.scheduler.async.HistoryPruner.PruneThreshold;
 import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl;
 import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
-import static com.twitter.aurora.scheduler.async.TaskGroups.FlappingTaskSettings;
-import static com.twitter.aurora.scheduler.async.TaskGroups.SchedulingSettings;
 
 /**
  * Binding module for async task management.
@@ -103,6 +103,11 @@ public class AsyncModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY =
       Arg.create(Amount.of(5L, Time.MINUTES));
 
+  @CmdLine(name = "max_reschedule_task_delay_on_startup",
+      help = "Upper bound of random delay for pending task rescheduling on scheduler startup.")
+  private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
+      Arg.create(Amount.of(30, Time.SECONDS));
+
   @CmdLine(name = "preemption_delay",
       help = "Time interval after which a pending task becomes eligible to preempt other tasks")
   private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
@@ -148,13 +153,17 @@ public class AsyncModule extends AbstractModule {
 
     binder().install(new PrivateModule() {
       @Override protected void configure() {
-        bind(SchedulingSettings.class).toInstance(new SchedulingSettings(
+        bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
             new TruncatedBinaryBackoff(INITIAL_SCHEDULE_DELAY.get(), MAX_SCHEDULE_DELAY.get()),
             RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
-        bind(FlappingTaskSettings.class).toInstance(new FlappingTaskSettings(
-            new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
-            FLAPPING_THRESHOLD.get()
-        ));
+
+        bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
+            .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+                new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
+                FLAPPING_THRESHOLD.get(),
+                MAX_RESCHEDULING_DELAY.get()));
+
+        bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
         bind(SchedulingAction.class).to(TaskScheduler.class);
         bind(TaskScheduler.class).in(Singleton.class);
         if (ENABLE_PREEMPTOR.get()) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aa9f826f/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
new file mode 100644
index 0000000..eefc03a
--- /dev/null
+++ b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.Random;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
+import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
+
+/**
+ * Calculates scheduling delays for tasks.
+ */
+interface RescheduleCalculator {
+  /**
+   * Gets a timestamp for the task to become eligible for (re)scheduling at scheduler startup.
+   *
+   * @param task Task to calculate timestamp for.
+   * @return Timestamp in msec.
+   */
+  long getStartupReadyTimeMs(IScheduledTask task);
+
+  /**
+   * Gets a timestamp for the task to become eligible for (re)scheduling.
+   *
+   * @param task Task to calculate timestamp for.
+   * @return Timestamp in msec.
+   */
+  long getReadyTimeMs(IScheduledTask task);
+
+  class RescheduleCalculatorImpl implements RescheduleCalculator {
+
+    private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
+
+    private final Storage storage;
+    private final RescheduleCalculatorSettings settings;
+    private final Clock clock;
+    private final Random random = new Random.SystemRandom(new java.util.Random());
+
+    private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
+        Predicates.in(Tasks.ACTIVE_STATES);
+
+    private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
+        new Function<ITaskEvent, ScheduleStatus>() {
+          @Override public ScheduleStatus apply(ITaskEvent input) {
+            return input.getStatus();
+          }
+        };
+
+    private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
+        EnumSet.of(RESTARTING, KILLING);
+
+    private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
+      @Override public boolean apply(IScheduledTask task) {
+        if (!task.isSetTaskEvents()) {
+          return false;
+        }
+
+        List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
+
+        // Avoid penalizing tasks that were interrupted by outside action, such as a user
+        // restarting them.
+        if (Iterables.any(Iterables.transform(events, TO_STATUS),
+            Predicates.in(INTERRUPTED_TASK_STATES))) {
+          return false;
+        }
+
+        ITaskEvent terminalEvent = Iterables.get(events, 0);
+        ScheduleStatus terminalState = terminalEvent.getStatus();
+        Preconditions.checkState(Tasks.isTerminated(terminalState));
+
+        ITaskEvent activeEvent =
+            Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
+
+        long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
+
+        return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
+      }
+    };
+
+    static class RescheduleCalculatorSettings {
+      private final BackoffStrategy flappingTaskBackoff;
+      private final Amount<Long, Time> flappingTaskThreashold;
+      private final Amount<Integer, Time>  maxStartupRescheduleDelay;
+
+      RescheduleCalculatorSettings(
+          BackoffStrategy flappingTaskBackoff,
+          Amount<Long, Time> flappingTaskThreashold,
+          Amount<Integer, Time> maxStartupRescheduleDelay) {
+
+        this.flappingTaskBackoff = checkNotNull(flappingTaskBackoff);
+        this.flappingTaskThreashold = checkNotNull(flappingTaskThreashold);
+        this.maxStartupRescheduleDelay = checkNotNull(maxStartupRescheduleDelay);
+      }
+    }
+
+    @Inject
+    RescheduleCalculatorImpl(
+        Storage storage,
+        RescheduleCalculatorSettings settings,
+        Clock clock) {
+
+      this.storage = checkNotNull(storage);
+      this.settings = checkNotNull(settings);
+      this.clock = checkNotNull(clock);
+    }
+
+    @Override
+    public long getStartupReadyTimeMs(IScheduledTask task) {
+      return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS))
+          + getTaskReadyTimestamp(task);
+    }
+
+    @Override
+    public long getReadyTimeMs(IScheduledTask task) {
+      return getTaskReadyTimestamp(task);
+    }
+
+    private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
+      if (!task.isSetAncestorId()) {
+        return Optional.absent();
+      }
+
+      ImmutableSet<IScheduledTask> res =
+          Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
+
+      return Optional.fromNullable(Iterables.getOnlyElement(res, null));
+    }
+
+    private long getTaskReadyTimestamp(IScheduledTask task) {
+      Optional<IScheduledTask> curTask = getTaskAncestor(task);
+      long penaltyMs = 0;
+      while (curTask.isPresent() && flapped.apply(curTask.get())) {
+        LOG.info(
+            String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
+        long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs);
+        // If the backoff strategy is truncated then there is no need for us to continue.
+        if (newPenalty == penaltyMs) {
+          break;
+        }
+        penaltyMs = newPenalty;
+        curTask = getTaskAncestor(curTask.get());
+      }
+
+      return penaltyMs + clock.nowMillis();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aa9f826f/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
index 9ea0229..f95f719 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
@@ -15,9 +15,6 @@
  */
 package com.twitter.aurora.scheduler.async;
 
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -25,23 +22,16 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.google.common.base.Function;
 import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import com.twitter.aurora.gen.ScheduleStatus;
 import com.twitter.aurora.scheduler.base.JobKeys;
 import com.twitter.aurora.scheduler.base.Query;
 import com.twitter.aurora.scheduler.base.Tasks;
@@ -53,7 +43,6 @@ import com.twitter.aurora.scheduler.storage.Storage;
 import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
 import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
 import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.base.Command;
 import com.twitter.common.quantity.Amount;
@@ -65,9 +54,7 @@ import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
 import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
 import static com.twitter.aurora.scheduler.async.TaskGroup.GroupState;
 
 /**
@@ -85,51 +72,58 @@ public class TaskGroups implements EventSubscriber {
 
   private final Storage storage;
   private final LoadingCache<GroupKey, TaskGroup> groups;
-  private final Amount<Long, Time> flappingThreshold;
-  private final BackoffStrategy flappingBackoffStrategy;
   private final Clock clock;
+  private final RescheduleCalculator rescheduleCalculator;
   private final Preemptor preemptor;
 
+  static class TaskGroupsSettings {
+    private final BackoffStrategy taskGroupBackoff;
+    private final RateLimiter rateLimiter;
+
+    TaskGroupsSettings(BackoffStrategy taskGroupBackoff, RateLimiter rateLimiter) {
+      this.taskGroupBackoff = checkNotNull(taskGroupBackoff);
+      this.rateLimiter = checkNotNull(rateLimiter);
+    }
+  }
+
   @Inject
   TaskGroups(
       ShutdownRegistry shutdownRegistry,
       Storage storage,
-      SchedulingSettings schedulingSettings,
+      TaskGroupsSettings settings,
       SchedulingAction schedulingAction,
-      FlappingTaskSettings flappingTaskSettings,
       Clock clock,
+      RescheduleCalculator rescheduleCalculator,
       Preemptor preemptor) {
 
     this(
         createThreadPool(shutdownRegistry),
         storage,
-        schedulingSettings.getBackoff(),
-        schedulingSettings.getRateLimit(),
+        settings.taskGroupBackoff,
+        settings.rateLimiter,
         schedulingAction,
-        flappingTaskSettings.getFlappingThreashold(),
         clock,
-        flappingTaskSettings.getBackoff(),
+        rescheduleCalculator,
         preemptor);
   }
 
   TaskGroups(
       final ScheduledExecutorService executor,
       final Storage storage,
-      final BackoffStrategy backoffStrategy,
+      final BackoffStrategy taskGroupBackoffStrategy,
       final RateLimiter rateLimiter,
       final SchedulingAction schedulingAction,
-      final Amount<Long, Time> flappingThreshold,
       final Clock clock,
-      final BackoffStrategy flappingBackoffStrategy,
+      final RescheduleCalculator rescheduleCalculator,
       final Preemptor preemptor) {
 
     this.storage = checkNotNull(storage);
     checkNotNull(executor);
-    checkNotNull(backoffStrategy);
+    checkNotNull(taskGroupBackoffStrategy);
+    checkNotNull(rateLimiter);
     checkNotNull(schedulingAction);
-    this.flappingThreshold = checkNotNull(flappingThreshold);
     this.clock = checkNotNull(clock);
-    this.flappingBackoffStrategy = checkNotNull(flappingBackoffStrategy);
+    this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
     this.preemptor = checkNotNull(preemptor);
 
     final SchedulingAction rateLimitedAction = new SchedulingAction() {
@@ -141,7 +135,7 @@ public class TaskGroups implements EventSubscriber {
 
     groups = CacheBuilder.newBuilder().build(new CacheLoader<GroupKey, TaskGroup>() {
       @Override public TaskGroup load(GroupKey key) {
-        TaskGroup group = new TaskGroup(key, backoffStrategy);
+        TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
         LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
         startGroup(group, executor, rateLimitedAction);
         return group;
@@ -216,76 +210,6 @@ public class TaskGroups implements EventSubscriber {
     groups.getUnchecked(new GroupKey(task.getTask())).push(task.getTaskId(), readyTimestamp);
   }
 
-  private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
-    if (!task.isSetAncestorId()) {
-      return Optional.absent();
-    }
-
-    ImmutableSet<IScheduledTask> res =
-        Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
-
-    return Optional.fromNullable(Iterables.getOnlyElement(res, null));
-  }
-
-  private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
-      Predicates.in(Tasks.ACTIVE_STATES);
-
-  private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
-      new Function<ITaskEvent, ScheduleStatus>() {
-    @Override public ScheduleStatus apply(ITaskEvent input) {
-      return input.getStatus();
-    }
-  };
-
-  private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
-      EnumSet.of(RESTARTING, KILLING);
-
-  private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
-    @Override public boolean apply(IScheduledTask task) {
-      if (!task.isSetTaskEvents()) {
-        return false;
-      }
-
-      List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
-
-      // Avoid penalizing tasks that were interrupted by outside action, such as a user
-      // restarting them.
-      if (Iterables.any(Iterables.transform(events, TO_STATUS),
-          Predicates.in(INTERRUPTED_TASK_STATES))) {
-        return false;
-      }
-
-      ITaskEvent terminalEvent = Iterables.get(events, 0);
-      ScheduleStatus terminalState = terminalEvent.getStatus();
-      Preconditions.checkState(Tasks.isTerminated(terminalState));
-
-      ITaskEvent activeEvent =
-          Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
-
-      long thresholdMs = flappingThreshold.as(Time.MILLISECONDS);
-
-      return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
-    }
-  };
-
-  private long getTaskReadyTimestamp(IScheduledTask task) {
-    Optional<IScheduledTask> curTask = getTaskAncestor(task);
-    long penaltyMs = 0;
-    while (curTask.isPresent() && flapped.apply(curTask.get())) {
-      LOG.info(
-          String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
-      long newPenalty = flappingBackoffStrategy.calculateBackoffMs(penaltyMs);
-      // If the backoff strategy is truncated then there is no need for us to continue.
-      if (newPenalty == penaltyMs) {
-        break;
-      }
-      penaltyMs = newPenalty;
-      curTask = getTaskAncestor(curTask.get());
-    }
-
-    return penaltyMs + clock.nowMillis();
-  }
-
   /**
    * Informs the task groups of a task state change.
    * <p>
@@ -297,7 +221,9 @@ public class TaskGroups implements EventSubscriber {
   @Subscribe
   public synchronized void taskChangedState(TaskStateChange stateChange) {
     if (stateChange.getNewState() == PENDING) {
-      add(stateChange.getTask().getAssignedTask(), getTaskReadyTimestamp(stateChange.getTask()));
+      add(
+          stateChange.getTask().getAssignedTask(),
+          rescheduleCalculator.getReadyTimeMs(stateChange.getTask()));
     }
   }
 
@@ -314,7 +240,7 @@ public class TaskGroups implements EventSubscriber {
     for (IScheduledTask task
         : Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(PENDING))) {
 
-      add(task.getAssignedTask(), getTaskReadyTimestamp(task));
+      add(task.getAssignedTask(), rescheduleCalculator.getStartupReadyTimeMs(task));
     }
   }
 
@@ -374,40 +300,4 @@ public class TaskGroups implements EventSubscriber {
      */
     boolean schedule(String taskId);
   }
-
-  static class SchedulingSettings {
-    private final BackoffStrategy backoff;
-    private final RateLimiter rateLimit;
-
-    SchedulingSettings(BackoffStrategy backoff, RateLimiter rateLimit) {
-      this.backoff = checkNotNull(backoff);
-      this.rateLimit = checkNotNull(rateLimit);
-    }
-
-    BackoffStrategy getBackoff() {
-      return backoff;
-    }
-
-    RateLimiter getRateLimit() {
-      return rateLimit;
-    }
-  }
-
-  static class FlappingTaskSettings {
-    private final BackoffStrategy backoff;
-    private final Amount<Long, Time> flappingThreashold;
-
-    FlappingTaskSettings(BackoffStrategy backoff, Amount<Long, Time> flappingThreashold) {
-      this.backoff = checkNotNull(backoff);
-      this.flappingThreashold = checkNotNull(flappingThreashold);
-    }
-
-    BackoffStrategy getBackoff() {
-      return backoff;
-    }
-
-    Amount<Long, Time> getFlappingThreashold() {
-      return flappingThreashold;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aa9f826f/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
index e7a2f21..a747f2b 100644
--- a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
@@ -49,6 +49,8 @@ import com.twitter.aurora.scheduler.Driver;
 import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import com.twitter.aurora.scheduler.async.TaskGroups.SchedulingAction;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl.RescheduleCalculatorSettings;
 import com.twitter.aurora.scheduler.base.Query;
 import com.twitter.aurora.scheduler.base.Tasks;
 import com.twitter.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
@@ -143,9 +145,14 @@ public class TaskSchedulerTest extends EasyMockTest {
         retryStrategy,
         rateLimiter,
         scheduler,
-        flappingThreshold,
         clock,
-        flappingStrategy,
+        new RescheduleCalculatorImpl(
+            storage,
+            new RescheduleCalculatorSettings(
+                flappingStrategy,
+                flappingThreshold,
+                Amount.of(5, Time.SECONDS)),
+            clock),
         preemptor);
   }