You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/18 01:37:48 UTC

[2/2] git commit: work in progress.

work in progress.


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4acd3cca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4acd3cca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4acd3cca

Branch: refs/heads/wfarner/throttled_state
Commit: 4acd3cca37f1f362dc6c6fc24c034ef450920f4a
Parents: 65f455c
Author: Bill Farner <bi...@twitter.com>
Authored: Tue Dec 17 16:37:17 2013 -0800
Committer: Bill Farner <bi...@twitter.com>
Committed: Tue Dec 17 16:37:17 2013 -0800

----------------------------------------------------------------------
 .../scheduler/async/RescheduleCalculator.java   |   7 +-
 .../aurora/scheduler/async/TaskThrottler.java   |  30 +++
 .../aurora/scheduler/events/PubsubEvent.java    |  41 ----
 .../scheduler/state/StateManagerImpl.java       |   6 -
 .../scheduler/state/TaskStateMachine.java       |  25 +--
 .../async/RescheduleCalculatorImplTest.java     | 185 +++++++++++++++++++
 .../scheduler/async/TaskSchedulerTest.java      | 141 +-------------
 .../state/BaseSchedulerCoreImplTest.java        | 144 ++++++++-------
 .../scheduler/state/StateManagerImplTest.java   |  36 +++-
 .../scheduler/state/TaskStateMachineTest.java   |  10 +
 10 files changed, 345 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
index 85e7233..1dac9c9 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
@@ -40,7 +40,6 @@ import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Clock;
 import com.twitter.common.util.Random;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -137,11 +136,7 @@ public interface RescheduleCalculator {
     }
 
     @Inject
-    RescheduleCalculatorImpl(
-        Storage storage,
-        RescheduleCalculatorSettings settings,
-        Clock clock) {
-
+    RescheduleCalculatorImpl(Storage storage, RescheduleCalculatorSettings settings) {
       this.storage = checkNotNull(storage);
       this.settings = checkNotNull(settings);
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskThrottler.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskThrottler.java
new file mode 100644
index 0000000..f4a5d3a
--- /dev/null
+++ b/src/main/java/com/twitter/aurora/scheduler/async/TaskThrottler.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import javax.inject.Inject;
+
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+
+/**
+ */
+class TaskThrottler implements EventSubscriber {
+
+  @Inject
+  TaskThrottler() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java b/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java
index 400d8b7..5b4af7d 100644
--- a/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java
@@ -201,47 +201,6 @@ public interface PubsubEvent {
     }
   }
 
-  public static class TaskRescheduled implements PubsubEvent {
-    private final String role;
-    private final String job;
-    private final int instance;
-
-    public TaskRescheduled(String role, String job, int instance) {
-      this.role = role;
-      this.job = job;
-      this.instance = instance;
-    }
-
-    public String getRole() {
-      return role;
-    }
-
-    public String getJob() {
-      return job;
-    }
-
-    public int getInstance() {
-      return instance;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof TaskRescheduled)) {
-        return false;
-      }
-
-      TaskRescheduled other = (TaskRescheduled) o;
-      return Objects.equal(role, other.role)
-          && Objects.equal(job, other.job)
-          && Objects.equal(instance, other.instance);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(role, job, instance);
-    }
-  }
-
   public static class StorageStarted implements PubsubEvent {
     @Override
     public boolean equals(Object o) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
index e1045fc..f208798 100644
--- a/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
@@ -367,12 +367,6 @@ public class StateManagerImpl implements StateManager {
             }
 
             createStateMachine(task).updateState(newState, Optional.of(auditMessage));
-            ITaskConfig taskInfo = task.getAssignedTask().getTask();
-            sideEffectWork.addTaskEvent(
-                new PubsubEvent.TaskRescheduled(
-                    taskInfo.getOwner().getRole(),
-                    taskInfo.getJobName(),
-                    task.getAssignedTask().getInstanceId()));
             break;
 
           case UPDATE_STATE:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
index f7c4b6d..96caa05 100644
--- a/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
@@ -78,6 +78,7 @@ class TaskStateMachine {
   private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
   private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
   private static final State STARTING = State.create(ScheduleStatus.STARTING);
+  private static final State THROTTLED = State.create(ScheduleStatus.THROTTLED);
   private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
 
   @VisibleForTesting
@@ -264,29 +265,23 @@ class TaskStateMachine {
       }
     };
 
+    final Closure<Transition<State>> deleteIfKilling =
+        Closures.filter(Transition.to(KILLING), addWorkClosure(WorkCommand.DELETE));
+
     stateMachine = StateMachine.<State>builder(taskId)
         .logTransitions()
         .initialState(State.create(initialState))
         .addState(
             Rule.from(INIT)
-                .to(PENDING, UNKNOWN))
+                .to(PENDING, THROTTLED, UNKNOWN))
         .addState(
             Rule.from(PENDING)
                 .to(ASSIGNED, KILLING)
-                .withCallback(
-                    new Closure<Transition<State>>() {
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
-                          case KILLING:
-                            addWork(WorkCommand.DELETE);
-                            break;
-
-                          default:
-                            // No-op.
-                        }
-                      }
-                    }
-                ))
+                .withCallback(deleteIfKilling))
+        .addState(
+            Rule.from(THROTTLED)
+            .to(PENDING, KILLING)
+            .withCallback(deleteIfKilling))
         .addState(
             Rule.from(ASSIGNED)
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/async/RescheduleCalculatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/async/RescheduleCalculatorImplTest.java b/src/test/java/com/twitter/aurora/scheduler/async/RescheduleCalculatorImplTest.java
new file mode 100644
index 0000000..2465686
--- /dev/null
+++ b/src/test/java/com/twitter/aurora/scheduler/async/RescheduleCalculatorImplTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.EnumSet;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.TaskEvent;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import static com.twitter.aurora.gen.ScheduleStatus.ASSIGNED;
+import static com.twitter.aurora.gen.ScheduleStatus.FAILED;
+import static com.twitter.aurora.gen.ScheduleStatus.INIT;
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
+import static com.twitter.aurora.gen.ScheduleStatus.RUNNING;
+import static org.easymock.EasyMock.expect;
+
+public class RescheduleCalculatorImplTest extends EasyMockTest {
+  @Before
+  public void setUp() {
+
+  }
+
+  @Test
+  public void testNoPenaltyForNoAncestor() {
+    // If a task doesn't have an ancestor there should be no penality for flapping.
+    IScheduledTask task = makeTask("a1", INIT);
+
+    expectOfferDeclineIn(10);
+    Capture<Runnable> first = expectTaskGroupBackoff(1);
+    expectTaskScheduled(task);
+
+    replayAndCreateScheduler();
+    offerQueue.addOffer(OFFER_A);
+
+    changeState(task, INIT, PENDING);
+
+    first.getValue().run();
+  }
+
+  @Test
+  public void testFlappingTasksBackoffTruncation() {
+    makeFlappyTask("a0", null);
+    makeFlappyTask("a1", "a0");
+    makeFlappyTask("a2", "a1");
+    IScheduledTask taskA3 = IScheduledTask.build(makeTask("a3", INIT).newBuilder()
+        .setAncestorId("a2"));
+
+    expectOfferDeclineIn(10);
+
+    Capture<Runnable> first = expectTaskGroupBackoff(10);
+    // The ancestry chain is 3 long, but if the backoff strategy truncates, we don't traverse the
+    // entire history.
+    expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
+    expect(flappingStrategy.calculateBackoffMs(5L)).andReturn(5L);
+    Capture<Runnable> flapping = expectTaskRetryIn(10);
+
+    expectTaskScheduled(taskA3);
+
+    replayAndCreateScheduler();
+    offerQueue.addOffer(OFFER_A);
+
+    changeState(taskA3, INIT, PENDING);
+
+    first.getValue().run();
+    clock.waitFor(10);
+    flapping.getValue().run();
+  }
+
+  @Test
+  public void testFlappingTasks() {
+    makeFlappyTask("a0", null);
+    IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
+        .setAncestorId("a0"));
+
+    expectOfferDeclineIn(10);
+    Capture<Runnable> first = expectTaskGroupBackoff(10);
+
+    expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
+    // Since A1 has been penalized, the task has to wait for another 10 ms until the penalty has
+    // expired.
+    Capture<Runnable> flapping = expectTaskRetryIn(10);
+
+    expectTaskScheduled(taskA1);
+
+    replayAndCreateScheduler();
+
+    offerQueue.addOffer(OFFER_A);
+
+    changeState(taskA1, INIT, PENDING);
+
+    first.getValue().run();
+    clock.waitFor(10);
+    flapping.getValue().run();
+  }
+
+  @Test
+  public void testNoPenaltyForInterruptedTasks() {
+    makeFlappyTaskWithStates("a0", EnumSet.of(INIT, PENDING, ASSIGNED, RESTARTING, FAILED), null);
+    IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
+        .setAncestorId("a0"));
+
+    expectOfferDeclineIn(10);
+    Capture<Runnable> first = expectTaskGroupBackoff(10);
+
+    expectTaskScheduled(taskA1);
+
+    replayAndCreateScheduler();
+
+    offerQueue.addOffer(OFFER_A);
+
+    changeState(taskA1, INIT, PENDING);
+
+    first.getValue().run();
+  }
+
+  private IScheduledTask makeFlappyTaskWithStates(
+      String taskId,
+      Iterable<ScheduleStatus> states,
+      @Nullable String ancestorId) {
+
+    Amount<Long, Time> timeInState = Amount.of(10L, Time.SECONDS);
+
+    ScheduledTask base = makeTask(taskId, INIT).newBuilder();
+
+    for (ScheduleStatus status : states) {
+      base.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
+      clock.advance(timeInState);
+    }
+
+    base.setAncestorId(ancestorId);
+
+    final IScheduledTask result = IScheduledTask.build(base);
+
+    // Insert the task if it doesn't already exist.
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider storeProvider) {
+        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+        if (taskStore.fetchTasks(Query.taskScoped(Tasks.id(result))).isEmpty()) {
+          taskStore.saveTasks(ImmutableSet.of(result));
+        }
+      }
+    });
+
+    return result;
+  }
+
+  private IScheduledTask makeFlappyTask(String taskId, @Nullable String ancestorId) {
+    return makeFlappyTaskWithStates(
+        taskId,
+        EnumSet.of(INIT, PENDING, ASSIGNED, RUNNING, FAILED),
+        ancestorId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
index a747f2b..5ac81e4 100644
--- a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
@@ -151,8 +151,7 @@ public class TaskSchedulerTest extends EasyMockTest {
             new RescheduleCalculatorSettings(
                 flappingStrategy,
                 flappingThreshold,
-                Amount.of(5, Time.SECONDS)),
-            clock),
+                Amount.of(5, Time.SECONDS))),
         preemptor);
   }
 
@@ -588,144 +587,6 @@ public class TaskSchedulerTest extends EasyMockTest {
     timeoutCapture.getValue().run();
   }
 
-  @Test
-  public void testNoPenaltyForNoAncestor() {
-    // If a task doesn't have an ancestor there should be no penality for flapping.
-    expectAnyMaintenanceCalls();
-    IScheduledTask task = makeTask("a1", INIT);
-
-    expectOfferDeclineIn(10);
-    Capture<Runnable> first = expectTaskGroupBackoff(1);
-    expectTaskScheduled(task);
-
-    replayAndCreateScheduler();
-    offerQueue.addOffer(OFFER_A);
-
-    changeState(task, INIT, PENDING);
-
-    first.getValue().run();
-  }
-
-  @Test
-  public void testFlappingTasksBackoffTruncation() {
-    expectAnyMaintenanceCalls();
-
-    makeFlappyTask("a0", null);
-    makeFlappyTask("a1", "a0");
-    makeFlappyTask("a2", "a1");
-    IScheduledTask taskA3 = IScheduledTask.build(makeTask("a3", INIT).newBuilder()
-        .setAncestorId("a2"));
-
-    expectOfferDeclineIn(10);
-
-    Capture<Runnable> first = expectTaskGroupBackoff(10);
-    // The ancestry chain is 3 long, but if the backoff strategy truncates, we don't traverse the
-    // entire history.
-    expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
-    expect(flappingStrategy.calculateBackoffMs(5L)).andReturn(5L);
-    Capture<Runnable> flapping = expectTaskRetryIn(10);
-
-    expectTaskScheduled(taskA3);
-
-    replayAndCreateScheduler();
-    offerQueue.addOffer(OFFER_A);
-
-    changeState(taskA3, INIT, PENDING);
-
-    first.getValue().run();
-    clock.waitFor(10);
-    flapping.getValue().run();
-  }
-
-  @Test
-  public void testFlappingTasks() {
-    expectAnyMaintenanceCalls();
-
-    makeFlappyTask("a0", null);
-    IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
-        .setAncestorId("a0"));
-
-    expectOfferDeclineIn(10);
-    Capture<Runnable> first = expectTaskGroupBackoff(10);
-
-    expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
-    // Since A1 has been penalized, the task has to wait for another 10 ms until the penalty has
-    // expired.
-    Capture<Runnable> flapping = expectTaskRetryIn(10);
-
-    expectTaskScheduled(taskA1);
-
-    replayAndCreateScheduler();
-
-    offerQueue.addOffer(OFFER_A);
-
-    changeState(taskA1, INIT, PENDING);
-
-    first.getValue().run();
-    clock.waitFor(10);
-    flapping.getValue().run();
-  }
-
-  @Test
-  public void testNoPenaltyForInterruptedTasks() {
-    expectAnyMaintenanceCalls();
-
-    makeFlappyTaskWithStates("a0", EnumSet.of(INIT, PENDING, ASSIGNED, RESTARTING, FAILED), null);
-    IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
-        .setAncestorId("a0"));
-
-    expectOfferDeclineIn(10);
-    Capture<Runnable> first = expectTaskGroupBackoff(10);
-
-    expectTaskScheduled(taskA1);
-
-    replayAndCreateScheduler();
-
-    offerQueue.addOffer(OFFER_A);
-
-    changeState(taskA1, INIT, PENDING);
-
-    first.getValue().run();
-  }
-
-  private IScheduledTask makeFlappyTaskWithStates(
-      String taskId,
-      Iterable<ScheduleStatus> states,
-      @Nullable String ancestorId) {
-
-    Amount<Long, Time> timeInState = Amount.of(10L, Time.SECONDS);
-
-    ScheduledTask base = makeTask(taskId, INIT).newBuilder();
-
-    for (ScheduleStatus status : states) {
-      base.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
-      clock.advance(timeInState);
-    }
-
-    base.setAncestorId(ancestorId);
-
-    final IScheduledTask result = IScheduledTask.build(base);
-
-    // Insert the task if it doesn't already exist.
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        if (taskStore.fetchTasks(Query.taskScoped(Tasks.id(result))).isEmpty()) {
-          taskStore.saveTasks(ImmutableSet.of(result));
-        }
-      }
-    });
-
-    return result;
-  }
-
-  private IScheduledTask makeFlappyTask(String taskId, @Nullable String ancestorId) {
-    return makeFlappyTaskWithStates(
-        taskId,
-        EnumSet.of(INIT, PENDING, ASSIGNED, RUNNING, FAILED),
-        ancestorId);
-  }
-
   private TaskInfo makeTaskInfo(IScheduledTask task) {
     return TaskInfo.newBuilder()
         .setName(Tasks.id(task))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 3e4a502..f9fb87c 100644
--- a/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -41,6 +41,7 @@ import com.google.common.collect.Sets;
 
 import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -60,6 +61,7 @@ import com.twitter.aurora.gen.TaskQuery;
 import com.twitter.aurora.gen.ValueConstraint;
 import com.twitter.aurora.scheduler.Driver;
 import com.twitter.aurora.scheduler.TaskIdGenerator;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator;
 import com.twitter.aurora.scheduler.base.JobKeys;
 import com.twitter.aurora.scheduler.base.Query;
 import com.twitter.aurora.scheduler.base.ScheduleException;
@@ -75,7 +77,6 @@ import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
 import com.twitter.aurora.scheduler.storage.StorageBackfill;
 import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IIdentity;
 import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
 import com.twitter.aurora.scheduler.storage.entities.IJobKey;
 import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
@@ -139,6 +140,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private CronJobManager cron;
   private FakeClock clock;
   private Closure<PubsubEvent> eventSink;
+  private RescheduleCalculator rescheduleCalculator;
   private ShutdownRegistry shutdownRegistry;
   private JobFilter jobFilter;
 
@@ -156,6 +158,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     clock = new FakeClock();
     eventSink = createMock(new Clazz<Closure<PubsubEvent>>() { });
     eventSink.execute(EasyMock.<PubsubEvent>anyObject());
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
     cronScheduler = createMock(CronScheduler.class);
     shutdownRegistry = createMock(ShutdownRegistry.class);
     jobFilter = createMock(JobFilter.class);
@@ -186,12 +189,19 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private void buildScheduler(Storage newStorage) throws Exception {
     this.storage = newStorage;
     storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
         StorageBackfill.backfill(storeProvider, clock);
       }
     });
 
-    stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
+    stateManager = new StateManagerImpl(
+        storage,
+        clock,
+        driver,
+        taskIdGenerator,
+        eventSink,
+        rescheduleCalculator);
     ImmediateJobManager immediateManager = new ImmediateJobManager(stateManager, storage);
     cron = new CronJobManager(stateManager, storage, cronScheduler, shutdownRegistry);
     scheduler = new SchedulerCoreImpl(
@@ -414,16 +424,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     }
   }
 
-  @Test
-  public void testSortableTaskIds() throws Exception {
-    control.replay();
-    buildScheduler();
-
-    for (IScheduledTask task : getTasks(Query.unscoped())) {
-      assertEquals(IIdentity.build(OWNER_A), task.getAssignedTask().getTask().getOwner());
-    }
-  }
-
   @Test(expected = ScheduleException.class)
   public void testCreateDuplicateJob() throws Exception {
     control.replay();
@@ -579,7 +579,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     assertTaskCount(1);
     assertEquals(PENDING, getTask(taskId).getStatus());
 
-    changeStatus(Query.taskScoped(taskId), ASSIGNED);
+    changeStatus(taskId, ASSIGNED);
 
     scheduler.startCronJob(KEY_A);
     assertTaskCount(2);
@@ -651,20 +651,28 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     }
   }
 
+  private IExpectationSetters<Long> expectTaskNotThrottled() {
+    return expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+        .andReturn(0L);
+  }
+
   @Test
   public void testServiceTasksRescheduled() throws Exception {
+    int numServiceTasks = 5;
+
+    expectTaskNotThrottled().times(numServiceTasks);
+
     control.replay();
     buildScheduler();
 
     // Schedule 5 service and 5 non-service tasks.
-    scheduler.createJob(makeJob(KEY_A, 5));
+    scheduler.createJob(makeJob(KEY_A, numServiceTasks));
     TaskConfig task = productionTask().setIsService(true);
     scheduler.createJob(
         makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
 
     assertEquals(10, getTasksByStatus(PENDING).size());
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
-    changeStatus(Query.roleScoped(ROLE_A), STARTING);
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
     assertEquals(10, getTasksByStatus(STARTING).size());
 
     changeStatus(Query.roleScoped(ROLE_A), RUNNING);
@@ -685,11 +693,14 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testServiceTaskIgnoresMaxFailures() throws Exception {
+    int totalFailures = 10;
+
+    expectTaskNotThrottled().times(totalFailures);
+
     control.replay();
     buildScheduler();
 
     int maxFailures = 5;
-    int totalFailures = 10;
 
     // Schedule a service task.
     TaskConfig task = productionTask()
@@ -703,9 +714,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
       String taskId = Tasks.id(
           getOnlyTask(Query.jobScoped(KEY_A).active()));
 
-      changeStatus(taskId, ASSIGNED);
-      changeStatus(taskId, STARTING);
-      changeStatus(taskId, RUNNING);
+      changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
       assertEquals(i - 1, getTask(taskId).getFailureCount());
       changeStatus(taskId, FAILED);
 
@@ -720,27 +729,26 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testTaskRescheduleOnKill() throws Exception {
+    int numServiceTasks = 5;
+
+    expectTaskNotThrottled().times(numServiceTasks);
+
     control.replay();
     buildScheduler();
 
-    // Create 5 non-service and 5 service tasks.
-    scheduler.createJob(makeJob(KEY_A, 5));
-    TaskConfig task = productionTask().setIsService(true);
-    scheduler.createJob(
-        makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
+    scheduler.createJob(makeJob(KEY_A, numServiceTasks));
 
-    assertEquals(10, getTasksByStatus(PENDING).size());
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
-    changeStatus(Query.roleScoped(ROLE_A), STARTING);
-    assertEquals(10, getTasksByStatus(STARTING).size());
+    assertEquals(5, getTasksByStatus(PENDING).size());
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
+    assertEquals(5, getTasksByStatus(STARTING).size());
     changeStatus(Query.roleScoped(ROLE_A), RUNNING);
-    assertEquals(10, getTasksByStatus(RUNNING).size());
+    assertEquals(5, getTasksByStatus(RUNNING).size());
 
     // All tasks will move back into PENDING state after getting KILLED.
     changeStatus(Query.roleScoped(ROLE_A), KILLED);
     Set<IScheduledTask> newTasks = getTasksByStatus(PENDING);
-    assertEquals(10, newTasks.size());
-    assertEquals(10, getTasksByStatus(KILLED).size());
+    assertEquals(5, newTasks.size());
+    assertEquals(5, getTasksByStatus(KILLED).size());
   }
 
   @Test
@@ -751,9 +759,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     buildScheduler();
 
     scheduler.createJob(makeJob(KEY_A, 1));
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
-    changeStatus(Query.roleScoped(ROLE_A), STARTING);
-    changeStatus(Query.roleScoped(ROLE_A), RUNNING);
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING, RUNNING);
     scheduler.killTasks(Query.roleScoped(ROLE_A), OWNER_A.getUser());
     changeStatus(Query.roleScoped(ROLE_A), KILLED);
 
@@ -767,6 +773,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testFailedTaskIncrementsFailureCount() throws Exception {
     int maxFailures = 5;
+    expectTaskNotThrottled().times(maxFailures - 1);
+
     control.replay();
     buildScheduler();
 
@@ -780,9 +788,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
       String taskId = Tasks.id(getOnlyTask(
           Query.jobScoped(KEY_A).active()));
 
-      changeStatus(taskId, ASSIGNED);
-      changeStatus(taskId, STARTING);
-      changeStatus(taskId, RUNNING);
+      changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
       assertEquals(i - 1, getTask(taskId).getFailureCount());
       changeStatus(taskId, FAILED);
 
@@ -821,8 +827,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
     assertTaskCount(10);
 
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
-    changeStatus(Query.roleScoped(ROLE_A), STARTING);
+    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
     assertTaskCount(10);
     changeStatus(Query.roleScoped(ROLE_A), RUNNING);
     assertTaskCount(10);
@@ -897,9 +902,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
     scheduler.createJob(makeJob(KEY_A, 1));
     String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
-    changeStatus(taskId, ASSIGNED);
-    changeStatus(taskId, STARTING);
-    changeStatus(taskId, RUNNING);
+    changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
     scheduler.killTasks(Query.taskScoped(taskId), OWNER_A.getUser());
     assertEquals(KILLING, getTask(taskId).getStatus());
     assertEquals(1, getTasks(Query.roleScoped(ROLE_A)).size());
@@ -927,29 +930,25 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testLostTaskRescheduled() throws Exception {
     expectKillTask(2);
+    expectTaskNotThrottled().times(2);
 
     control.replay();
     buildScheduler();
 
-    int maxFailures = 5;
-    TaskConfig task = productionTask().setMaxTaskFailures(maxFailures);
-    scheduler.createJob(makeJob(KEY_A, task, 1));
+    scheduler.createJob(makeJob(KEY_A, 1));
     assertTaskCount(1);
 
     Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.jobScoped(KEY_A));
+    String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
     assertEquals(1, tasks.size());
 
-    changeStatus(Query.unscoped().byStatus(PENDING), ASSIGNED);
+    changeStatus(taskId, ASSIGNED, LOST);
 
-    Query.Builder pendingQuery = Query.unscoped().byStatus(PENDING);
-    changeStatus(Query.unscoped().byStatus(ASSIGNED), LOST);
-    assertEquals(PENDING, getOnlyTask(pendingQuery).getStatus());
-    assertTaskCount(2);
+    String newTaskId = Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)));
+    assertFalse(newTaskId.equals(taskId));
 
-    changeStatus(Query.unscoped().byStatus(PENDING), ASSIGNED);
-    changeStatus(Query.unscoped().byStatus(ASSIGNED), LOST);
-    assertEquals(PENDING, getOnlyTask(pendingQuery).getStatus());
-    assertTaskCount(3);
+    changeStatus(newTaskId, ASSIGNED, LOST);
+    assertFalse(newTaskId.equals(Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)))));
   }
 
   @Test
@@ -1019,6 +1018,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testSlaveDeletesTasks() throws Exception {
+    expectTaskNotThrottled();
+
     control.replay();
     buildScheduler();
 
@@ -1031,10 +1032,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     assignTask(taskId1, SLAVE_ID, SLAVE_HOST_1);
     assignTask(taskId2, SLAVE_ID, SLAVE_HOST_1);
 
-    changeStatus(taskId1, STARTING);
-    changeStatus(taskId1, RUNNING);
-    changeStatus(taskId2, STARTING);
-    changeStatus(taskId2, FINISHED);
+    changeStatus(taskId1, STARTING, RUNNING);
+    changeStatus(taskId2, STARTING, FINISHED);
 
     scheduler.tasksDeleted(ImmutableSet.of(taskId1, taskId2));
 
@@ -1051,13 +1050,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testRestartShards() throws Exception {
     expectKillTask(2);
+    expectTaskNotThrottled().times(2);
 
     control.replay();
     buildScheduler();
 
     scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 6));
-    changeStatus(Query.jobScoped(KEY_A), ASSIGNED);
-    changeStatus(Query.jobScoped(KEY_A), RUNNING);
+    changeStatus(Query.jobScoped(KEY_A), ASSIGNED, RUNNING);
     scheduler.restartShards(KEY_A, ImmutableSet.of(1, 5), OWNER_A.user);
     assertEquals(4, getTasks(Query.unscoped().byStatus(RUNNING)).size());
     assertEquals(2, getTasks(Query.unscoped().byStatus(RESTARTING)).size());
@@ -1067,12 +1066,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test(expected = ScheduleException.class)
   public void testRestartNonexistentShard() throws Exception {
+    expectTaskNotThrottled();
+
     control.replay();
     buildScheduler();
 
     scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 1));
-    changeStatus(Query.jobScoped(KEY_A), ASSIGNED);
-    changeStatus(Query.jobScoped(KEY_A), FINISHED);
+    changeStatus(Query.jobScoped(KEY_A), ASSIGNED, FINISHED);
     scheduler.restartShards(KEY_A, ImmutableSet.of(5), OWNER_A.user);
   }
 
@@ -1109,6 +1109,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testPortResourceResetAfterReschedule() throws Exception {
     expectKillTask(1);
+    expectTaskNotThrottled();
 
     control.replay();
     buildScheduler();
@@ -1141,8 +1142,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.createJob(makeJob(KEY_A, 1));
 
     String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
-    changeStatus(taskId, ASSIGNED);
-    changeStatus(taskId, STARTING);
+    changeStatus(taskId, ASSIGNED, STARTING);
     changeStatus(taskId, FAILED, Optional.of("bad stuff happened"));
 
     String hostname = getLocalHost();
@@ -1458,12 +1458,16 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.setTaskStatus(query, status, message);
   }
 
-  public void changeStatus(Query.Builder query, ScheduleStatus status) {
-    changeStatus(query, status, Optional.<String>absent());
+  public void changeStatus(Query.Builder query, ScheduleStatus status, ScheduleStatus... statuses) {
+    for (ScheduleStatus nextStatus
+        : ImmutableList.<ScheduleStatus>builder().add(status).add(statuses).build()) {
+
+      changeStatus(query, nextStatus, Optional.<String>absent());
+    }
   }
 
-  public void changeStatus(String taskId, ScheduleStatus status) {
-    changeStatus(taskId, status, Optional.<String>absent());
+  public void changeStatus(String taskId, ScheduleStatus status, ScheduleStatus... statuses) {
+    changeStatus(Query.taskScoped(taskId), status, statuses);
   }
 
   public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java
index 7de377c..46b0b03 100644
--- a/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java
@@ -42,6 +42,7 @@ import com.twitter.aurora.gen.TaskConfig;
 import com.twitter.aurora.gen.TaskEvent;
 import com.twitter.aurora.scheduler.Driver;
 import com.twitter.aurora.scheduler.TaskIdGenerator;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator;
 import com.twitter.aurora.scheduler.base.Query;
 import com.twitter.aurora.scheduler.base.Tasks;
 import com.twitter.aurora.scheduler.events.PubsubEvent;
@@ -55,6 +56,8 @@ import com.twitter.common.base.Closure;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
+import static com.twitter.aurora.gen.ScheduleStatus.FAILED;
+import static com.twitter.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
@@ -76,6 +79,7 @@ public class StateManagerImplTest extends EasyMockTest {
 
   private Driver driver;
   private TaskIdGenerator taskIdGenerator;
+  private RescheduleCalculator rescheduleCalculator;
   private Closure<PubsubEvent> eventSink;
   private StateManagerImpl stateManager;
   private final FakeClock clock = new FakeClock();
@@ -86,9 +90,16 @@ public class StateManagerImplTest extends EasyMockTest {
     taskIdGenerator = createMock(TaskIdGenerator.class);
     driver = createMock(Driver.class);
     eventSink = createMock(new Clazz<Closure<PubsubEvent>>() { });
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
     // TODO(William Farner): Use a mocked storage.
     storage = MemStorage.newEmptyStorage();
-    stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
+    stateManager = new StateManagerImpl(
+        storage,
+        clock,
+        driver,
+        taskIdGenerator,
+        eventSink,
+        rescheduleCalculator);
   }
 
   @After
@@ -226,7 +237,8 @@ public class StateManagerImplTest extends EasyMockTest {
     // Trigger an event that produces a side-effect and a PubSub event .
     eventSink.execute(matchStateChange(id, INIT, PENDING));
     expectLastCall().andAnswer(new IAnswer<Void>() {
-      @Override public Void answer() throws Throwable {
+      @Override
+      public Void answer() throws Throwable {
         stateManager.changeState(
             Query.unscoped(), ScheduleStatus.ASSIGNED, Optional.<String>absent());
         return null;
@@ -255,6 +267,26 @@ public class StateManagerImplTest extends EasyMockTest {
     stateManager.deleteTasks(ImmutableSet.of(taskId));
   }
 
+  @Test
+  public void testThrottleTask() {
+    ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+    String newTaskId = "b";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(newTaskId);
+    expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+        .andReturn(100L);
+    expectStateTransitions(newTaskId, INIT, THROTTLED);
+
+    control.replay();
+
+    insertTask(task, 0);
+    stateManager.changeState(Query.taskScoped(taskId), ASSIGNED, Optional.<String>absent());
+    stateManager.changeState(Query.taskScoped(taskId), RUNNING, Optional.<String>absent());
+    stateManager.changeState(Query.taskScoped(taskId), FAILED, Optional.<String>absent());
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java
index 4188772..b5732da 100644
--- a/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java
@@ -36,6 +36,7 @@ import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
+import static com.twitter.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.hamcrest.CoreMatchers.is;
@@ -317,6 +318,15 @@ public class TaskStateMachineTest extends EasyMockTest {
     transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, KILLING, KILLED);
   }
 
+  @Test
+  public void testThrottledTask() {
+    expectWork(UPDATE_STATE).times(2);
+
+    control.replay();
+
+    transition(stateMachine, THROTTLED, PENDING);
+  }
+
   private static void transition(TaskStateMachine stateMachine, ScheduleStatus... states) {
     for (ScheduleStatus status : states) {
       stateMachine.updateState(status);