You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/18 01:37:48 UTC
[2/2] git commit: work in progress.
work in progress.
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4acd3cca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4acd3cca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4acd3cca
Branch: refs/heads/wfarner/throttled_state
Commit: 4acd3cca37f1f362dc6c6fc24c034ef450920f4a
Parents: 65f455c
Author: Bill Farner <bi...@twitter.com>
Authored: Tue Dec 17 16:37:17 2013 -0800
Committer: Bill Farner <bi...@twitter.com>
Committed: Tue Dec 17 16:37:17 2013 -0800
----------------------------------------------------------------------
.../scheduler/async/RescheduleCalculator.java | 7 +-
.../aurora/scheduler/async/TaskThrottler.java | 30 +++
.../aurora/scheduler/events/PubsubEvent.java | 41 ----
.../scheduler/state/StateManagerImpl.java | 6 -
.../scheduler/state/TaskStateMachine.java | 25 +--
.../async/RescheduleCalculatorImplTest.java | 185 +++++++++++++++++++
.../scheduler/async/TaskSchedulerTest.java | 141 +-------------
.../state/BaseSchedulerCoreImplTest.java | 144 ++++++++-------
.../scheduler/state/StateManagerImplTest.java | 36 +++-
.../scheduler/state/TaskStateMachineTest.java | 10 +
10 files changed, 345 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
index 85e7233..1dac9c9 100644
--- a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
@@ -40,7 +40,6 @@ import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Clock;
import com.twitter.common.util.Random;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -137,11 +136,7 @@ public interface RescheduleCalculator {
}
@Inject
- RescheduleCalculatorImpl(
- Storage storage,
- RescheduleCalculatorSettings settings,
- Clock clock) {
-
+ RescheduleCalculatorImpl(Storage storage, RescheduleCalculatorSettings settings) {
this.storage = checkNotNull(storage);
this.settings = checkNotNull(settings);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskThrottler.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskThrottler.java
new file mode 100644
index 0000000..f4a5d3a
--- /dev/null
+++ b/src/main/java/com/twitter/aurora/scheduler/async/TaskThrottler.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import javax.inject.Inject;
+
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+
+/**
+ */
+class TaskThrottler implements EventSubscriber {
+
+ @Inject
+ TaskThrottler() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java b/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java
index 400d8b7..5b4af7d 100644
--- a/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/com/twitter/aurora/scheduler/events/PubsubEvent.java
@@ -201,47 +201,6 @@ public interface PubsubEvent {
}
}
- public static class TaskRescheduled implements PubsubEvent {
- private final String role;
- private final String job;
- private final int instance;
-
- public TaskRescheduled(String role, String job, int instance) {
- this.role = role;
- this.job = job;
- this.instance = instance;
- }
-
- public String getRole() {
- return role;
- }
-
- public String getJob() {
- return job;
- }
-
- public int getInstance() {
- return instance;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof TaskRescheduled)) {
- return false;
- }
-
- TaskRescheduled other = (TaskRescheduled) o;
- return Objects.equal(role, other.role)
- && Objects.equal(job, other.job)
- && Objects.equal(instance, other.instance);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(role, job, instance);
- }
- }
-
public static class StorageStarted implements PubsubEvent {
@Override
public boolean equals(Object o) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
index e1045fc..f208798 100644
--- a/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/com/twitter/aurora/scheduler/state/StateManagerImpl.java
@@ -367,12 +367,6 @@ public class StateManagerImpl implements StateManager {
}
createStateMachine(task).updateState(newState, Optional.of(auditMessage));
- ITaskConfig taskInfo = task.getAssignedTask().getTask();
- sideEffectWork.addTaskEvent(
- new PubsubEvent.TaskRescheduled(
- taskInfo.getOwner().getRole(),
- taskInfo.getJobName(),
- task.getAssignedTask().getInstanceId()));
break;
case UPDATE_STATE:
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
index f7c4b6d..96caa05 100644
--- a/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/com/twitter/aurora/scheduler/state/TaskStateMachine.java
@@ -78,6 +78,7 @@ class TaskStateMachine {
private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
private static final State STARTING = State.create(ScheduleStatus.STARTING);
+ private static final State THROTTLED = State.create(ScheduleStatus.THROTTLED);
private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
@VisibleForTesting
@@ -264,29 +265,23 @@ class TaskStateMachine {
}
};
+ final Closure<Transition<State>> deleteIfKilling =
+ Closures.filter(Transition.to(KILLING), addWorkClosure(WorkCommand.DELETE));
+
stateMachine = StateMachine.<State>builder(taskId)
.logTransitions()
.initialState(State.create(initialState))
.addState(
Rule.from(INIT)
- .to(PENDING, UNKNOWN))
+ .to(PENDING, THROTTLED, UNKNOWN))
.addState(
Rule.from(PENDING)
.to(ASSIGNED, KILLING)
- .withCallback(
- new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case KILLING:
- addWork(WorkCommand.DELETE);
- break;
-
- default:
- // No-op.
- }
- }
- }
- ))
+ .withCallback(deleteIfKilling))
+ .addState(
+ Rule.from(THROTTLED)
+ .to(PENDING, KILLING)
+ .withCallback(deleteIfKilling))
.addState(
Rule.from(ASSIGNED)
.to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/async/RescheduleCalculatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/async/RescheduleCalculatorImplTest.java b/src/test/java/com/twitter/aurora/scheduler/async/RescheduleCalculatorImplTest.java
new file mode 100644
index 0000000..2465686
--- /dev/null
+++ b/src/test/java/com/twitter/aurora/scheduler/async/RescheduleCalculatorImplTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.util.EnumSet;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.TaskEvent;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import static com.twitter.aurora.gen.ScheduleStatus.ASSIGNED;
+import static com.twitter.aurora.gen.ScheduleStatus.FAILED;
+import static com.twitter.aurora.gen.ScheduleStatus.INIT;
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
+import static com.twitter.aurora.gen.ScheduleStatus.RUNNING;
+import static org.easymock.EasyMock.expect;
+
+public class RescheduleCalculatorImplTest extends EasyMockTest {
+ @Before
+ public void setUp() {
+
+ }
+
+ @Test
+ public void testNoPenaltyForNoAncestor() {
+ // If a task doesn't have an ancestor there should be no penality for flapping.
+ IScheduledTask task = makeTask("a1", INIT);
+
+ expectOfferDeclineIn(10);
+ Capture<Runnable> first = expectTaskGroupBackoff(1);
+ expectTaskScheduled(task);
+
+ replayAndCreateScheduler();
+ offerQueue.addOffer(OFFER_A);
+
+ changeState(task, INIT, PENDING);
+
+ first.getValue().run();
+ }
+
+ @Test
+ public void testFlappingTasksBackoffTruncation() {
+ makeFlappyTask("a0", null);
+ makeFlappyTask("a1", "a0");
+ makeFlappyTask("a2", "a1");
+ IScheduledTask taskA3 = IScheduledTask.build(makeTask("a3", INIT).newBuilder()
+ .setAncestorId("a2"));
+
+ expectOfferDeclineIn(10);
+
+ Capture<Runnable> first = expectTaskGroupBackoff(10);
+ // The ancestry chain is 3 long, but if the backoff strategy truncates, we don't traverse the
+ // entire history.
+ expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
+ expect(flappingStrategy.calculateBackoffMs(5L)).andReturn(5L);
+ Capture<Runnable> flapping = expectTaskRetryIn(10);
+
+ expectTaskScheduled(taskA3);
+
+ replayAndCreateScheduler();
+ offerQueue.addOffer(OFFER_A);
+
+ changeState(taskA3, INIT, PENDING);
+
+ first.getValue().run();
+ clock.waitFor(10);
+ flapping.getValue().run();
+ }
+
+ @Test
+ public void testFlappingTasks() {
+ makeFlappyTask("a0", null);
+ IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
+ .setAncestorId("a0"));
+
+ expectOfferDeclineIn(10);
+ Capture<Runnable> first = expectTaskGroupBackoff(10);
+
+ expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
+ // Since A1 has been penalized, the task has to wait for another 10 ms until the penalty has
+ // expired.
+ Capture<Runnable> flapping = expectTaskRetryIn(10);
+
+ expectTaskScheduled(taskA1);
+
+ replayAndCreateScheduler();
+
+ offerQueue.addOffer(OFFER_A);
+
+ changeState(taskA1, INIT, PENDING);
+
+ first.getValue().run();
+ clock.waitFor(10);
+ flapping.getValue().run();
+ }
+
+ @Test
+ public void testNoPenaltyForInterruptedTasks() {
+ makeFlappyTaskWithStates("a0", EnumSet.of(INIT, PENDING, ASSIGNED, RESTARTING, FAILED), null);
+ IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
+ .setAncestorId("a0"));
+
+ expectOfferDeclineIn(10);
+ Capture<Runnable> first = expectTaskGroupBackoff(10);
+
+ expectTaskScheduled(taskA1);
+
+ replayAndCreateScheduler();
+
+ offerQueue.addOffer(OFFER_A);
+
+ changeState(taskA1, INIT, PENDING);
+
+ first.getValue().run();
+ }
+
+ private IScheduledTask makeFlappyTaskWithStates(
+ String taskId,
+ Iterable<ScheduleStatus> states,
+ @Nullable String ancestorId) {
+
+ Amount<Long, Time> timeInState = Amount.of(10L, Time.SECONDS);
+
+ ScheduledTask base = makeTask(taskId, INIT).newBuilder();
+
+ for (ScheduleStatus status : states) {
+ base.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
+ clock.advance(timeInState);
+ }
+
+ base.setAncestorId(ancestorId);
+
+ final IScheduledTask result = IScheduledTask.build(base);
+
+ // Insert the task if it doesn't already exist.
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+ if (taskStore.fetchTasks(Query.taskScoped(Tasks.id(result))).isEmpty()) {
+ taskStore.saveTasks(ImmutableSet.of(result));
+ }
+ }
+ });
+
+ return result;
+ }
+
+ private IScheduledTask makeFlappyTask(String taskId, @Nullable String ancestorId) {
+ return makeFlappyTaskWithStates(
+ taskId,
+ EnumSet.of(INIT, PENDING, ASSIGNED, RUNNING, FAILED),
+ ancestorId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
index a747f2b..5ac81e4 100644
--- a/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/async/TaskSchedulerTest.java
@@ -151,8 +151,7 @@ public class TaskSchedulerTest extends EasyMockTest {
new RescheduleCalculatorSettings(
flappingStrategy,
flappingThreshold,
- Amount.of(5, Time.SECONDS)),
- clock),
+ Amount.of(5, Time.SECONDS))),
preemptor);
}
@@ -588,144 +587,6 @@ public class TaskSchedulerTest extends EasyMockTest {
timeoutCapture.getValue().run();
}
- @Test
- public void testNoPenaltyForNoAncestor() {
- // If a task doesn't have an ancestor there should be no penality for flapping.
- expectAnyMaintenanceCalls();
- IScheduledTask task = makeTask("a1", INIT);
-
- expectOfferDeclineIn(10);
- Capture<Runnable> first = expectTaskGroupBackoff(1);
- expectTaskScheduled(task);
-
- replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
-
- changeState(task, INIT, PENDING);
-
- first.getValue().run();
- }
-
- @Test
- public void testFlappingTasksBackoffTruncation() {
- expectAnyMaintenanceCalls();
-
- makeFlappyTask("a0", null);
- makeFlappyTask("a1", "a0");
- makeFlappyTask("a2", "a1");
- IScheduledTask taskA3 = IScheduledTask.build(makeTask("a3", INIT).newBuilder()
- .setAncestorId("a2"));
-
- expectOfferDeclineIn(10);
-
- Capture<Runnable> first = expectTaskGroupBackoff(10);
- // The ancestry chain is 3 long, but if the backoff strategy truncates, we don't traverse the
- // entire history.
- expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
- expect(flappingStrategy.calculateBackoffMs(5L)).andReturn(5L);
- Capture<Runnable> flapping = expectTaskRetryIn(10);
-
- expectTaskScheduled(taskA3);
-
- replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
-
- changeState(taskA3, INIT, PENDING);
-
- first.getValue().run();
- clock.waitFor(10);
- flapping.getValue().run();
- }
-
- @Test
- public void testFlappingTasks() {
- expectAnyMaintenanceCalls();
-
- makeFlappyTask("a0", null);
- IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
- .setAncestorId("a0"));
-
- expectOfferDeclineIn(10);
- Capture<Runnable> first = expectTaskGroupBackoff(10);
-
- expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
- // Since A1 has been penalized, the task has to wait for another 10 ms until the penalty has
- // expired.
- Capture<Runnable> flapping = expectTaskRetryIn(10);
-
- expectTaskScheduled(taskA1);
-
- replayAndCreateScheduler();
-
- offerQueue.addOffer(OFFER_A);
-
- changeState(taskA1, INIT, PENDING);
-
- first.getValue().run();
- clock.waitFor(10);
- flapping.getValue().run();
- }
-
- @Test
- public void testNoPenaltyForInterruptedTasks() {
- expectAnyMaintenanceCalls();
-
- makeFlappyTaskWithStates("a0", EnumSet.of(INIT, PENDING, ASSIGNED, RESTARTING, FAILED), null);
- IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
- .setAncestorId("a0"));
-
- expectOfferDeclineIn(10);
- Capture<Runnable> first = expectTaskGroupBackoff(10);
-
- expectTaskScheduled(taskA1);
-
- replayAndCreateScheduler();
-
- offerQueue.addOffer(OFFER_A);
-
- changeState(taskA1, INIT, PENDING);
-
- first.getValue().run();
- }
-
- private IScheduledTask makeFlappyTaskWithStates(
- String taskId,
- Iterable<ScheduleStatus> states,
- @Nullable String ancestorId) {
-
- Amount<Long, Time> timeInState = Amount.of(10L, Time.SECONDS);
-
- ScheduledTask base = makeTask(taskId, INIT).newBuilder();
-
- for (ScheduleStatus status : states) {
- base.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
- clock.advance(timeInState);
- }
-
- base.setAncestorId(ancestorId);
-
- final IScheduledTask result = IScheduledTask.build(base);
-
- // Insert the task if it doesn't already exist.
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- if (taskStore.fetchTasks(Query.taskScoped(Tasks.id(result))).isEmpty()) {
- taskStore.saveTasks(ImmutableSet.of(result));
- }
- }
- });
-
- return result;
- }
-
- private IScheduledTask makeFlappyTask(String taskId, @Nullable String ancestorId) {
- return makeFlappyTaskWithStates(
- taskId,
- EnumSet.of(INIT, PENDING, ASSIGNED, RUNNING, FAILED),
- ancestorId);
- }
-
private TaskInfo makeTaskInfo(IScheduledTask task) {
return TaskInfo.newBuilder()
.setName(Tasks.id(task))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 3e4a502..f9fb87c 100644
--- a/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -41,6 +41,7 @@ import com.google.common.collect.Sets;
import org.apache.mesos.Protos.SlaveID;
import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
@@ -60,6 +61,7 @@ import com.twitter.aurora.gen.TaskQuery;
import com.twitter.aurora.gen.ValueConstraint;
import com.twitter.aurora.scheduler.Driver;
import com.twitter.aurora.scheduler.TaskIdGenerator;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator;
import com.twitter.aurora.scheduler.base.JobKeys;
import com.twitter.aurora.scheduler.base.Query;
import com.twitter.aurora.scheduler.base.ScheduleException;
@@ -75,7 +77,6 @@ import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
import com.twitter.aurora.scheduler.storage.StorageBackfill;
import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IIdentity;
import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
import com.twitter.aurora.scheduler.storage.entities.IJobKey;
import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
@@ -139,6 +140,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private CronJobManager cron;
private FakeClock clock;
private Closure<PubsubEvent> eventSink;
+ private RescheduleCalculator rescheduleCalculator;
private ShutdownRegistry shutdownRegistry;
private JobFilter jobFilter;
@@ -156,6 +158,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
clock = new FakeClock();
eventSink = createMock(new Clazz<Closure<PubsubEvent>>() { });
eventSink.execute(EasyMock.<PubsubEvent>anyObject());
+ rescheduleCalculator = createMock(RescheduleCalculator.class);
cronScheduler = createMock(CronScheduler.class);
shutdownRegistry = createMock(ShutdownRegistry.class);
jobFilter = createMock(JobFilter.class);
@@ -186,12 +189,19 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private void buildScheduler(Storage newStorage) throws Exception {
this.storage = newStorage;
storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
StorageBackfill.backfill(storeProvider, clock);
}
});
- stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
+ stateManager = new StateManagerImpl(
+ storage,
+ clock,
+ driver,
+ taskIdGenerator,
+ eventSink,
+ rescheduleCalculator);
ImmediateJobManager immediateManager = new ImmediateJobManager(stateManager, storage);
cron = new CronJobManager(stateManager, storage, cronScheduler, shutdownRegistry);
scheduler = new SchedulerCoreImpl(
@@ -414,16 +424,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
}
}
- @Test
- public void testSortableTaskIds() throws Exception {
- control.replay();
- buildScheduler();
-
- for (IScheduledTask task : getTasks(Query.unscoped())) {
- assertEquals(IIdentity.build(OWNER_A), task.getAssignedTask().getTask().getOwner());
- }
- }
-
@Test(expected = ScheduleException.class)
public void testCreateDuplicateJob() throws Exception {
control.replay();
@@ -579,7 +579,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
assertTaskCount(1);
assertEquals(PENDING, getTask(taskId).getStatus());
- changeStatus(Query.taskScoped(taskId), ASSIGNED);
+ changeStatus(taskId, ASSIGNED);
scheduler.startCronJob(KEY_A);
assertTaskCount(2);
@@ -651,20 +651,28 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
}
}
+ private IExpectationSetters<Long> expectTaskNotThrottled() {
+ return expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+ .andReturn(0L);
+ }
+
@Test
public void testServiceTasksRescheduled() throws Exception {
+ int numServiceTasks = 5;
+
+ expectTaskNotThrottled().times(numServiceTasks);
+
control.replay();
buildScheduler();
// Schedule 5 service and 5 non-service tasks.
- scheduler.createJob(makeJob(KEY_A, 5));
+ scheduler.createJob(makeJob(KEY_A, numServiceTasks));
TaskConfig task = productionTask().setIsService(true);
scheduler.createJob(
makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
assertEquals(10, getTasksByStatus(PENDING).size());
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
- changeStatus(Query.roleScoped(ROLE_A), STARTING);
+ changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
assertEquals(10, getTasksByStatus(STARTING).size());
changeStatus(Query.roleScoped(ROLE_A), RUNNING);
@@ -685,11 +693,14 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testServiceTaskIgnoresMaxFailures() throws Exception {
+ int totalFailures = 10;
+
+ expectTaskNotThrottled().times(totalFailures);
+
control.replay();
buildScheduler();
int maxFailures = 5;
- int totalFailures = 10;
// Schedule a service task.
TaskConfig task = productionTask()
@@ -703,9 +714,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
String taskId = Tasks.id(
getOnlyTask(Query.jobScoped(KEY_A).active()));
- changeStatus(taskId, ASSIGNED);
- changeStatus(taskId, STARTING);
- changeStatus(taskId, RUNNING);
+ changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
assertEquals(i - 1, getTask(taskId).getFailureCount());
changeStatus(taskId, FAILED);
@@ -720,27 +729,26 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testTaskRescheduleOnKill() throws Exception {
+ int numServiceTasks = 5;
+
+ expectTaskNotThrottled().times(numServiceTasks);
+
control.replay();
buildScheduler();
- // Create 5 non-service and 5 service tasks.
- scheduler.createJob(makeJob(KEY_A, 5));
- TaskConfig task = productionTask().setIsService(true);
- scheduler.createJob(
- makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
+ scheduler.createJob(makeJob(KEY_A, numServiceTasks));
- assertEquals(10, getTasksByStatus(PENDING).size());
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
- changeStatus(Query.roleScoped(ROLE_A), STARTING);
- assertEquals(10, getTasksByStatus(STARTING).size());
+ assertEquals(5, getTasksByStatus(PENDING).size());
+ changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
+ assertEquals(5, getTasksByStatus(STARTING).size());
changeStatus(Query.roleScoped(ROLE_A), RUNNING);
- assertEquals(10, getTasksByStatus(RUNNING).size());
+ assertEquals(5, getTasksByStatus(RUNNING).size());
// All tasks will move back into PENDING state after getting KILLED.
changeStatus(Query.roleScoped(ROLE_A), KILLED);
Set<IScheduledTask> newTasks = getTasksByStatus(PENDING);
- assertEquals(10, newTasks.size());
- assertEquals(10, getTasksByStatus(KILLED).size());
+ assertEquals(5, newTasks.size());
+ assertEquals(5, getTasksByStatus(KILLED).size());
}
@Test
@@ -751,9 +759,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
buildScheduler();
scheduler.createJob(makeJob(KEY_A, 1));
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
- changeStatus(Query.roleScoped(ROLE_A), STARTING);
- changeStatus(Query.roleScoped(ROLE_A), RUNNING);
+ changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING, RUNNING);
scheduler.killTasks(Query.roleScoped(ROLE_A), OWNER_A.getUser());
changeStatus(Query.roleScoped(ROLE_A), KILLED);
@@ -767,6 +773,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testFailedTaskIncrementsFailureCount() throws Exception {
int maxFailures = 5;
+ expectTaskNotThrottled().times(maxFailures - 1);
+
control.replay();
buildScheduler();
@@ -780,9 +788,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
String taskId = Tasks.id(getOnlyTask(
Query.jobScoped(KEY_A).active()));
- changeStatus(taskId, ASSIGNED);
- changeStatus(taskId, STARTING);
- changeStatus(taskId, RUNNING);
+ changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
assertEquals(i - 1, getTask(taskId).getFailureCount());
changeStatus(taskId, FAILED);
@@ -821,8 +827,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
assertTaskCount(10);
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
- changeStatus(Query.roleScoped(ROLE_A), STARTING);
+ changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
assertTaskCount(10);
changeStatus(Query.roleScoped(ROLE_A), RUNNING);
assertTaskCount(10);
@@ -897,9 +902,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.createJob(makeJob(KEY_A, 1));
String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
- changeStatus(taskId, ASSIGNED);
- changeStatus(taskId, STARTING);
- changeStatus(taskId, RUNNING);
+ changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
scheduler.killTasks(Query.taskScoped(taskId), OWNER_A.getUser());
assertEquals(KILLING, getTask(taskId).getStatus());
assertEquals(1, getTasks(Query.roleScoped(ROLE_A)).size());
@@ -927,29 +930,25 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testLostTaskRescheduled() throws Exception {
expectKillTask(2);
+ expectTaskNotThrottled().times(2);
control.replay();
buildScheduler();
- int maxFailures = 5;
- TaskConfig task = productionTask().setMaxTaskFailures(maxFailures);
- scheduler.createJob(makeJob(KEY_A, task, 1));
+ scheduler.createJob(makeJob(KEY_A, 1));
assertTaskCount(1);
Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.jobScoped(KEY_A));
+ String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
assertEquals(1, tasks.size());
- changeStatus(Query.unscoped().byStatus(PENDING), ASSIGNED);
+ changeStatus(taskId, ASSIGNED, LOST);
- Query.Builder pendingQuery = Query.unscoped().byStatus(PENDING);
- changeStatus(Query.unscoped().byStatus(ASSIGNED), LOST);
- assertEquals(PENDING, getOnlyTask(pendingQuery).getStatus());
- assertTaskCount(2);
+ String newTaskId = Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)));
+ assertFalse(newTaskId.equals(taskId));
- changeStatus(Query.unscoped().byStatus(PENDING), ASSIGNED);
- changeStatus(Query.unscoped().byStatus(ASSIGNED), LOST);
- assertEquals(PENDING, getOnlyTask(pendingQuery).getStatus());
- assertTaskCount(3);
+ changeStatus(newTaskId, ASSIGNED, LOST);
+ assertFalse(newTaskId.equals(Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)))));
}
@Test
@@ -1019,6 +1018,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testSlaveDeletesTasks() throws Exception {
+ expectTaskNotThrottled();
+
control.replay();
buildScheduler();
@@ -1031,10 +1032,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
assignTask(taskId1, SLAVE_ID, SLAVE_HOST_1);
assignTask(taskId2, SLAVE_ID, SLAVE_HOST_1);
- changeStatus(taskId1, STARTING);
- changeStatus(taskId1, RUNNING);
- changeStatus(taskId2, STARTING);
- changeStatus(taskId2, FINISHED);
+ changeStatus(taskId1, STARTING, RUNNING);
+ changeStatus(taskId2, STARTING, FINISHED);
scheduler.tasksDeleted(ImmutableSet.of(taskId1, taskId2));
@@ -1051,13 +1050,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testRestartShards() throws Exception {
expectKillTask(2);
+ expectTaskNotThrottled().times(2);
control.replay();
buildScheduler();
scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 6));
- changeStatus(Query.jobScoped(KEY_A), ASSIGNED);
- changeStatus(Query.jobScoped(KEY_A), RUNNING);
+ changeStatus(Query.jobScoped(KEY_A), ASSIGNED, RUNNING);
scheduler.restartShards(KEY_A, ImmutableSet.of(1, 5), OWNER_A.user);
assertEquals(4, getTasks(Query.unscoped().byStatus(RUNNING)).size());
assertEquals(2, getTasks(Query.unscoped().byStatus(RESTARTING)).size());
@@ -1067,12 +1066,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test(expected = ScheduleException.class)
public void testRestartNonexistentShard() throws Exception {
+ expectTaskNotThrottled();
+
control.replay();
buildScheduler();
scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 1));
- changeStatus(Query.jobScoped(KEY_A), ASSIGNED);
- changeStatus(Query.jobScoped(KEY_A), FINISHED);
+ changeStatus(Query.jobScoped(KEY_A), ASSIGNED, FINISHED);
scheduler.restartShards(KEY_A, ImmutableSet.of(5), OWNER_A.user);
}
@@ -1109,6 +1109,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testPortResourceResetAfterReschedule() throws Exception {
expectKillTask(1);
+ expectTaskNotThrottled();
control.replay();
buildScheduler();
@@ -1141,8 +1142,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.createJob(makeJob(KEY_A, 1));
String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
- changeStatus(taskId, ASSIGNED);
- changeStatus(taskId, STARTING);
+ changeStatus(taskId, ASSIGNED, STARTING);
changeStatus(taskId, FAILED, Optional.of("bad stuff happened"));
String hostname = getLocalHost();
@@ -1458,12 +1458,16 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.setTaskStatus(query, status, message);
}
- public void changeStatus(Query.Builder query, ScheduleStatus status) {
- changeStatus(query, status, Optional.<String>absent());
+ public void changeStatus(Query.Builder query, ScheduleStatus status, ScheduleStatus... statuses) {
+ for (ScheduleStatus nextStatus
+ : ImmutableList.<ScheduleStatus>builder().add(status).add(statuses).build()) {
+
+ changeStatus(query, nextStatus, Optional.<String>absent());
+ }
}
- public void changeStatus(String taskId, ScheduleStatus status) {
- changeStatus(taskId, status, Optional.<String>absent());
+ public void changeStatus(String taskId, ScheduleStatus status, ScheduleStatus... statuses) {
+ changeStatus(Query.taskScoped(taskId), status, statuses);
}
public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java
index 7de377c..46b0b03 100644
--- a/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/state/StateManagerImplTest.java
@@ -42,6 +42,7 @@ import com.twitter.aurora.gen.TaskConfig;
import com.twitter.aurora.gen.TaskEvent;
import com.twitter.aurora.scheduler.Driver;
import com.twitter.aurora.scheduler.TaskIdGenerator;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator;
import com.twitter.aurora.scheduler.base.Query;
import com.twitter.aurora.scheduler.base.Tasks;
import com.twitter.aurora.scheduler.events.PubsubEvent;
@@ -55,6 +56,8 @@ import com.twitter.common.base.Closure;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.testing.FakeClock;
+import static com.twitter.aurora.gen.ScheduleStatus.FAILED;
+import static com.twitter.aurora.gen.ScheduleStatus.THROTTLED;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
@@ -76,6 +79,7 @@ public class StateManagerImplTest extends EasyMockTest {
private Driver driver;
private TaskIdGenerator taskIdGenerator;
+ private RescheduleCalculator rescheduleCalculator;
private Closure<PubsubEvent> eventSink;
private StateManagerImpl stateManager;
private final FakeClock clock = new FakeClock();
@@ -86,9 +90,16 @@ public class StateManagerImplTest extends EasyMockTest {
taskIdGenerator = createMock(TaskIdGenerator.class);
driver = createMock(Driver.class);
eventSink = createMock(new Clazz<Closure<PubsubEvent>>() { });
+ rescheduleCalculator = createMock(RescheduleCalculator.class);
// TODO(William Farner): Use a mocked storage.
storage = MemStorage.newEmptyStorage();
- stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
+ stateManager = new StateManagerImpl(
+ storage,
+ clock,
+ driver,
+ taskIdGenerator,
+ eventSink,
+ rescheduleCalculator);
}
@After
@@ -226,7 +237,8 @@ public class StateManagerImplTest extends EasyMockTest {
// Trigger an event that produces a side-effect and a PubSub event .
eventSink.execute(matchStateChange(id, INIT, PENDING));
expectLastCall().andAnswer(new IAnswer<Void>() {
- @Override public Void answer() throws Throwable {
+ @Override
+ public Void answer() throws Throwable {
stateManager.changeState(
Query.unscoped(), ScheduleStatus.ASSIGNED, Optional.<String>absent());
return null;
@@ -255,6 +267,26 @@ public class StateManagerImplTest extends EasyMockTest {
stateManager.deleteTasks(ImmutableSet.of(taskId));
}
+ @Test
+ public void testThrottleTask() {
+ ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+ String taskId = "a";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+ expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+ String newTaskId = "b";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(newTaskId);
+ expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+ .andReturn(100L);
+ expectStateTransitions(newTaskId, INIT, THROTTLED);
+
+ control.replay();
+
+ insertTask(task, 0);
+ stateManager.changeState(Query.taskScoped(taskId), ASSIGNED, Optional.<String>absent());
+ stateManager.changeState(Query.taskScoped(taskId), RUNNING, Optional.<String>absent());
+ stateManager.changeState(Query.taskScoped(taskId), FAILED, Optional.<String>absent());
+ }
+
private void expectStateTransitions(
String taskId,
ScheduleStatus initial,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4acd3cca/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java
index 4188772..b5732da 100644
--- a/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/state/TaskStateMachineTest.java
@@ -36,6 +36,7 @@ import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.testing.FakeClock;
+import static com.twitter.aurora.gen.ScheduleStatus.THROTTLED;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expectLastCall;
import static org.hamcrest.CoreMatchers.is;
@@ -317,6 +318,15 @@ public class TaskStateMachineTest extends EasyMockTest {
transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, KILLING, KILLED);
}
+ @Test
+ public void testThrottledTask() {
+ expectWork(UPDATE_STATE).times(2);
+
+ control.replay();
+
+ transition(stateMachine, THROTTLED, PENDING);
+ }
+
private static void transition(TaskStateMachine stateMachine, ScheduleStatus... states) {
for (ScheduleStatus status : states) {
stateMachine.updateState(status);