You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2013/12/05 20:58:17 UTC
[5/9] git commit: Adding random jitter on pending task rescheduling
after startup
Adding random jitter on pending task rescheduling after startup
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/aa9f826f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/aa9f826f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/aa9f826f
Branch: refs/heads/master
Commit: aa9f826f297e1aa7f92abb8c3972d3d6c0d0e5ea
Parents: ecda2b3
Author: Maxim Khutornenko <mk...@twitter.com>
Authored: Thu Dec 5 08:09:41 2013 -0800
Committer: Maxim Khutornenko <mk...@twitter.com>
Committed: Thu Dec 5 08:09:41 2013 -0800
----------------------------------------------------------------------
.../aurora/scheduler/async/AsyncModule.java | 23 ++-
.../scheduler/async/RescheduleCalculator.java | 188 +++++++++++++++++++
.../aurora/scheduler/async/TaskGroups.java | 162 +++-------------
.../scheduler/async/TaskSchedulerTest.java | 11 +-
4 files changed, 239 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aa9f826f/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
index 37913a8..db07841 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
@@ -30,7 +30,9 @@ import com.google.inject.TypeLiteral;
import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
import com.twitter.aurora.scheduler.async.TaskGroups.SchedulingAction;
+import com.twitter.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
import com.twitter.aurora.scheduler.events.PubsubEventModule;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
@@ -45,8 +47,6 @@ import com.twitter.common.util.TruncatedBinaryBackoff;
import static com.twitter.aurora.scheduler.async.HistoryPruner.PruneThreshold;
import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl;
import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
-import static com.twitter.aurora.scheduler.async.TaskGroups.FlappingTaskSettings;
-import static com.twitter.aurora.scheduler.async.TaskGroups.SchedulingSettings;
/**
* Binding module for async task management.
@@ -103,6 +103,11 @@ public class AsyncModule extends AbstractModule {
private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY =
Arg.create(Amount.of(5L, Time.MINUTES));
+ @CmdLine(name = "max_reschedule_task_delay_on_startup",
+ help = "Upper bound of random delay for pending task rescheduling on scheduler startup.")
+ private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
+ Arg.create(Amount.of(30, Time.SECONDS));
+
@CmdLine(name = "preemption_delay",
help = "Time interval after which a pending task becomes eligible to preempt other tasks")
private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
@@ -148,13 +153,17 @@ public class AsyncModule extends AbstractModule {
binder().install(new PrivateModule() {
@Override protected void configure() {
- bind(SchedulingSettings.class).toInstance(new SchedulingSettings(
+ bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
new TruncatedBinaryBackoff(INITIAL_SCHEDULE_DELAY.get(), MAX_SCHEDULE_DELAY.get()),
RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
- bind(FlappingTaskSettings.class).toInstance(new FlappingTaskSettings(
- new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
- FLAPPING_THRESHOLD.get()
- ));
+
+ bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
+ .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+ new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
+ FLAPPING_THRESHOLD.get(),
+ MAX_RESCHEDULING_DELAY.get()));
+
+ bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
bind(SchedulingAction.class).to(TaskScheduler.class);
bind(TaskScheduler.class).in(Singleton.class);
if (ENABLE_PREEMPTOR.get()) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aa9f826f/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
new file mode 100644
index 0000000..eefc03a
--- /dev/null
+++ b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.Random;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
+import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
+
+/**
+ * Calculates scheduling delays for tasks.
+ */
+interface RescheduleCalculator {
+ /**
+ * Gets a timestamp for the task to become eligible for (re)scheduling at scheduler startup.
+ *
+ * @param task Task to calculate timestamp for.
+ * @return Timestamp in msec.
+ */
+ long getStartupReadyTimeMs(IScheduledTask task);
+
+ /**
+ * Gets a timestamp for the task to become eligible for (re)scheduling.
+ *
+ * @param task Task to calculate timestamp for.
+ * @return Timestamp in msec.
+ */
+ long getReadyTimeMs(IScheduledTask task);
+
+ class RescheduleCalculatorImpl implements RescheduleCalculator {
+
+ private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
+
+ private final Storage storage;
+ private final RescheduleCalculatorSettings settings;
+ private final Clock clock;
+ private final Random random = new Random.SystemRandom(new java.util.Random());
+
+ private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
+ Predicates.in(Tasks.ACTIVE_STATES);
+
+ private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
+ new Function<ITaskEvent, ScheduleStatus>() {
+ @Override public ScheduleStatus apply(ITaskEvent input) {
+ return input.getStatus();
+ }
+ };
+
+ private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
+ EnumSet.of(RESTARTING, KILLING);
+
+ private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
+ @Override public boolean apply(IScheduledTask task) {
+ if (!task.isSetTaskEvents()) {
+ return false;
+ }
+
+ List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
+
+ // Avoid penalizing tasks that were interrupted by outside action, such as a user
+ // restarting them.
+ if (Iterables.any(Iterables.transform(events, TO_STATUS),
+ Predicates.in(INTERRUPTED_TASK_STATES))) {
+ return false;
+ }
+
+ ITaskEvent terminalEvent = Iterables.get(events, 0);
+ ScheduleStatus terminalState = terminalEvent.getStatus();
+ Preconditions.checkState(Tasks.isTerminated(terminalState));
+
+ ITaskEvent activeEvent =
+ Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
+
+ long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
+
+ return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
+ }
+ };
+
+ static class RescheduleCalculatorSettings {
+ private final BackoffStrategy flappingTaskBackoff;
+ private final Amount<Long, Time> flappingTaskThreashold;
+ private final Amount<Integer, Time> maxStartupRescheduleDelay;
+
+ RescheduleCalculatorSettings(
+ BackoffStrategy flappingTaskBackoff,
+ Amount<Long, Time> flappingTaskThreashold,
+ Amount<Integer, Time> maxStartupRescheduleDelay) {
+
+ this.flappingTaskBackoff = checkNotNull(flappingTaskBackoff);
+ this.flappingTaskThreashold = checkNotNull(flappingTaskThreashold);
+ this.maxStartupRescheduleDelay = checkNotNull(maxStartupRescheduleDelay);
+ }
+ }
+
+ @Inject
+ RescheduleCalculatorImpl(
+ Storage storage,
+ RescheduleCalculatorSettings settings,
+ Clock clock) {
+
+ this.storage = checkNotNull(storage);
+ this.settings = checkNotNull(settings);
+ this.clock = checkNotNull(clock);
+ }
+
+ @Override
+ public long getStartupReadyTimeMs(IScheduledTask task) {
+ return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS))
+ + getTaskReadyTimestamp(task);
+ }
+
+ @Override
+ public long getReadyTimeMs(IScheduledTask task) {
+ return getTaskReadyTimestamp(task);
+ }
+
+ private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
+ if (!task.isSetAncestorId()) {
+ return Optional.absent();
+ }
+
+ ImmutableSet<IScheduledTask> res =
+ Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
+
+ return Optional.fromNullable(Iterables.getOnlyElement(res, null));
+ }
+
+ private long getTaskReadyTimestamp(IScheduledTask task) {
+ Optional<IScheduledTask> curTask = getTaskAncestor(task);
+ long penaltyMs = 0;
+ while (curTask.isPresent() && flapped.apply(curTask.get())) {
+ LOG.info(
+ String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
+ long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs);
+ // If the backoff strategy is truncated then there is no need for us to continue.
+ if (newPenalty == penaltyMs) {
+ break;
+ }
+ penaltyMs = newPenalty;
+ curTask = getTaskAncestor(curTask.get());
+ }
+
+ return penaltyMs + clock.nowMillis();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aa9f826f/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
index 9ea0229..f95f719 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
@@ -15,9 +15,6 @@
*/
package com.twitter.aurora.scheduler.async;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -25,23 +22,16 @@ import java.util.logging.Logger;
import javax.inject.Inject;
-import com.google.common.base.Function;
import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.aurora.gen.ScheduleStatus;
import com.twitter.aurora.scheduler.base.JobKeys;
import com.twitter.aurora.scheduler.base.Query;
import com.twitter.aurora.scheduler.base.Tasks;
@@ -53,7 +43,6 @@ import com.twitter.aurora.scheduler.storage.Storage;
import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
import com.twitter.common.application.ShutdownRegistry;
import com.twitter.common.base.Command;
import com.twitter.common.quantity.Amount;
@@ -65,9 +54,7 @@ import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
import static com.twitter.aurora.scheduler.async.TaskGroup.GroupState;
/**
@@ -85,51 +72,58 @@ public class TaskGroups implements EventSubscriber {
private final Storage storage;
private final LoadingCache<GroupKey, TaskGroup> groups;
- private final Amount<Long, Time> flappingThreshold;
- private final BackoffStrategy flappingBackoffStrategy;
private final Clock clock;
+ private final RescheduleCalculator rescheduleCalculator;
private final Preemptor preemptor;
+ static class TaskGroupsSettings {
+ private final BackoffStrategy taskGroupBackoff;
+ private final RateLimiter rateLimiter;
+
+ TaskGroupsSettings(BackoffStrategy taskGroupBackoff, RateLimiter rateLimiter) {
+ this.taskGroupBackoff = checkNotNull(taskGroupBackoff);
+ this.rateLimiter = checkNotNull(rateLimiter);
+ }
+ }
+
@Inject
TaskGroups(
ShutdownRegistry shutdownRegistry,
Storage storage,
- SchedulingSettings schedulingSettings,
+ TaskGroupsSettings settings,
SchedulingAction schedulingAction,
- FlappingTaskSettings flappingTaskSettings,
Clock clock,
+ RescheduleCalculator rescheduleCalculator,
Preemptor preemptor) {
this(
createThreadPool(shutdownRegistry),
storage,
- schedulingSettings.getBackoff(),
- schedulingSettings.getRateLimit(),
+ settings.taskGroupBackoff,
+ settings.rateLimiter,
schedulingAction,
- flappingTaskSettings.getFlappingThreashold(),
clock,
- flappingTaskSettings.getBackoff(),
+ rescheduleCalculator,
preemptor);
}
TaskGroups(
final ScheduledExecutorService executor,
final Storage storage,
- final BackoffStrategy backoffStrategy,
+ final BackoffStrategy taskGroupBackoffStrategy,
final RateLimiter rateLimiter,
final SchedulingAction schedulingAction,
- final Amount<Long, Time> flappingThreshold,
final Clock clock,
- final BackoffStrategy flappingBackoffStrategy,
+ final RescheduleCalculator rescheduleCalculator,
final Preemptor preemptor) {
this.storage = checkNotNull(storage);
checkNotNull(executor);
- checkNotNull(backoffStrategy);
+ checkNotNull(taskGroupBackoffStrategy);
+ checkNotNull(rateLimiter);
checkNotNull(schedulingAction);
- this.flappingThreshold = checkNotNull(flappingThreshold);
this.clock = checkNotNull(clock);
- this.flappingBackoffStrategy = checkNotNull(flappingBackoffStrategy);
+ this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
this.preemptor = checkNotNull(preemptor);
final SchedulingAction rateLimitedAction = new SchedulingAction() {
@@ -141,7 +135,7 @@ public class TaskGroups implements EventSubscriber {
groups = CacheBuilder.newBuilder().build(new CacheLoader<GroupKey, TaskGroup>() {
@Override public TaskGroup load(GroupKey key) {
- TaskGroup group = new TaskGroup(key, backoffStrategy);
+ TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
startGroup(group, executor, rateLimitedAction);
return group;
@@ -216,76 +210,6 @@ public class TaskGroups implements EventSubscriber {
groups.getUnchecked(new GroupKey(task.getTask())).push(task.getTaskId(), readyTimestamp);
}
- private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
- if (!task.isSetAncestorId()) {
- return Optional.absent();
- }
-
- ImmutableSet<IScheduledTask> res =
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
-
- return Optional.fromNullable(Iterables.getOnlyElement(res, null));
- }
-
- private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
- Predicates.in(Tasks.ACTIVE_STATES);
-
- private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
- new Function<ITaskEvent, ScheduleStatus>() {
- @Override public ScheduleStatus apply(ITaskEvent input) {
- return input.getStatus();
- }
- };
-
- private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
- EnumSet.of(RESTARTING, KILLING);
-
- private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
- @Override public boolean apply(IScheduledTask task) {
- if (!task.isSetTaskEvents()) {
- return false;
- }
-
- List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
-
- // Avoid penalizing tasks that were interrupted by outside action, such as a user
- // restarting them.
- if (Iterables.any(Iterables.transform(events, TO_STATUS),
- Predicates.in(INTERRUPTED_TASK_STATES))) {
- return false;
- }
-
- ITaskEvent terminalEvent = Iterables.get(events, 0);
- ScheduleStatus terminalState = terminalEvent.getStatus();
- Preconditions.checkState(Tasks.isTerminated(terminalState));
-
- ITaskEvent activeEvent =
- Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
-
- long thresholdMs = flappingThreshold.as(Time.MILLISECONDS);
-
- return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
- }
- };
-
- private long getTaskReadyTimestamp(IScheduledTask task) {
- Optional<IScheduledTask> curTask = getTaskAncestor(task);
- long penaltyMs = 0;
- while (curTask.isPresent() && flapped.apply(curTask.get())) {
- LOG.info(
- String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
- long newPenalty = flappingBackoffStrategy.calculateBackoffMs(penaltyMs);
- // If the backoff strategy is truncated then there is no need for us to continue.
- if (newPenalty == penaltyMs) {
- break;
- }
- penaltyMs = newPenalty;
- curTask = getTaskAncestor(curTask.get());
- }
-
- return penaltyMs + clock.nowMillis();
- }
-
/**
* Informs the task groups of a task state change.
* <p>
@@ -297,7 +221,9 @@ public class TaskGroups implements EventSubscriber {
@Subscribe
public synchronized void taskChangedState(TaskStateChange stateChange) {
if (stateChange.getNewState() == PENDING) {
- add(stateChange.getTask().getAssignedTask(), getTaskReadyTimestamp(stateChange.getTask()));
+ add(
+ stateChange.getTask().getAssignedTask(),
+ rescheduleCalculator.getReadyTimeMs(stateChange.getTask()));
}
}
@@ -314,7 +240,7 @@ public class TaskGroups implements EventSubscriber {
for (IScheduledTask task
: Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(PENDING))) {
- add(task.getAssignedTask(), getTaskReadyTimestamp(task));
+ add(task.getAssignedTask(), rescheduleCalculator.getStartupReadyTimeMs(task));
}
}
@@ -374,40 +300,4 @@ public class TaskGroups implements EventSubscriber {
*/
boolean schedule(String taskId);
}
-
- static class SchedulingSettings {
- private final BackoffStrategy backoff;
- private final RateLimiter rateLimit;
-
- SchedulingSettings(BackoffStrategy backoff, RateLimiter rateLimit) {
- this.backoff = checkNotNull(backoff);
- this.rateLimit = checkNotNull(rateLimit);
- }
-
- BackoffStrategy getBackoff() {
- return backoff;
- }
-
- RateLimiter getRateLimit() {
- return rateLimit;
- }
- }
-
- static class FlappingTaskSettings {
- private final BackoffStrategy backoff;
- private final Amount<Long, Time> flappingThreashold;
-
- FlappingTaskSettings(BackoffStrategy backoff, Amount<Long, Time> flappingThreashold) {
- this.backoff = checkNotNull(backoff);
- this.flappingThreashold = checkNotNull(flappingThreashold);
- }
-
- BackoffStrategy getBackoff() {
- return backoff;
- }
-
- Amount<Long, Time> getFlappingThreashold() {
- return flappingThreashold;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/aa9f826f/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
index e7a2f21..a747f2b 100644
--- a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
@@ -49,6 +49,8 @@ import com.twitter.aurora.scheduler.Driver;
import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
import com.twitter.aurora.scheduler.async.TaskGroups.SchedulingAction;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl.RescheduleCalculatorSettings;
import com.twitter.aurora.scheduler.base.Query;
import com.twitter.aurora.scheduler.base.Tasks;
import com.twitter.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
@@ -143,9 +145,14 @@ public class TaskSchedulerTest extends EasyMockTest {
retryStrategy,
rateLimiter,
scheduler,
- flappingThreshold,
clock,
- flappingStrategy,
+ new RescheduleCalculatorImpl(
+ storage,
+ new RescheduleCalculatorSettings(
+ flappingStrategy,
+ flappingThreshold,
+ Amount.of(5, Time.SECONDS)),
+ clock),
preemptor);
}