You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/14 21:17:14 UTC
git commit: When rescheduling a task,
send it to the THROTTLED state if it has been penalized for flapping.
Updated Branches:
refs/heads/master e1aee67b7 -> a584410c4
When rescheduling a task, send it to the THROTTLED state if it has been
penalized for flapping.
Bugs closed: AURORA-23
Reviewed at https://reviews.apache.org/r/16740/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/a584410c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/a584410c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/a584410c
Branch: refs/heads/master
Commit: a584410c49d615bea6198bed6f0c5ec52c411b0b
Parents: e1aee67
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 14 12:14:54 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 14 12:14:54 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/async/AsyncModule.java | 18 +-
.../aurora/scheduler/async/Preemptor.java | 2 +-
.../scheduler/async/RescheduleCalculator.java | 47 ++---
.../aurora/scheduler/async/TaskGroup.java | 1 -
.../aurora/scheduler/async/TaskGroups.java | 9 +-
.../aurora/scheduler/async/TaskThrottler.java | 87 +++++++++
.../org/apache/aurora/scheduler/base/Tasks.java | 7 +-
.../aurora/scheduler/events/PubsubEvent.java | 41 ----
.../aurora/scheduler/http/SchedulerzJob.java | 24 +--
.../aurora/scheduler/http/SchedulerzRole.java | 4 +-
.../scheduler/state/StateManagerImpl.java | 39 ++--
.../scheduler/state/TaskStateMachine.java | 27 ++-
.../scheduler/storage/StorageBackfill.java | 19 --
.../thrift/org/apache/aurora/gen/api.thrift | 3 +-
.../async/RescheduleCalculatorImplTest.java | 189 +++++++++++++++++++
.../scheduler/async/TaskSchedulerTest.java | 149 +--------------
.../scheduler/async/TaskThrottlerTest.java | 137 ++++++++++++++
.../state/BaseSchedulerCoreImplTest.java | 144 +++++++-------
.../scheduler/state/StateManagerImplTest.java | 36 +++-
.../scheduler/state/TaskStateMachineTest.java | 11 ++
.../scheduler/storage/StorageBackfillTest.java | 92 ---------
21 files changed, 619 insertions(+), 467 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 6be658d..72d3621 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -162,7 +162,7 @@ public class AsyncModule extends AbstractModule {
// AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses
// a MultiBinder, which cannot span multiple injectors.
- binder().install(new PrivateModule() {
+ install(new PrivateModule() {
@Override protected void configure() {
bind(new TypeLiteral<Amount<Long, Time>>() { })
.toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get());
@@ -175,7 +175,7 @@ public class AsyncModule extends AbstractModule {
});
PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
- binder().install(new PrivateModule() {
+ install(new PrivateModule() {
@Override protected void configure() {
bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
new TruncatedBinaryBackoff(INITIAL_SCHEDULE_DELAY.get(), MAX_SCHEDULE_DELAY.get()),
@@ -188,6 +188,7 @@ public class AsyncModule extends AbstractModule {
MAX_RESCHEDULING_DELAY.get()));
bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
+ expose(RescheduleCalculator.class);
if (ENABLE_PREEMPTOR.get()) {
bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
bind(PreemptorImpl.class).in(Singleton.class);
@@ -206,7 +207,7 @@ public class AsyncModule extends AbstractModule {
bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
- binder().install(new PrivateModule() {
+ install(new PrivateModule() {
@Override protected void configure() {
bind(OfferReturnDelay.class).to(RandomJitterReturnDelay.class);
bind(ScheduledExecutorService.class).toInstance(executor);
@@ -217,7 +218,7 @@ public class AsyncModule extends AbstractModule {
});
PubsubEventModule.bindSubscriber(binder(), OfferQueue.class);
- binder().install(new PrivateModule() {
+ install(new PrivateModule() {
@Override protected void configure() {
// TODO(ksweeney): Create a configuration validator module so this can be injected.
// TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
@@ -231,6 +232,15 @@ public class AsyncModule extends AbstractModule {
}
});
PubsubEventModule.bindSubscriber(binder(), HistoryPruner.class);
+
+ install(new PrivateModule() {
+ @Override protected void configure() {
+ bind(ScheduledExecutorService.class).toInstance(executor);
+ bind(TaskThrottler.class).in(Singleton.class);
+ expose(TaskThrottler.class);
+ }
+ });
+ PubsubEventModule.bindSubscriber(binder(), TaskThrottler.class);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index 0afbef9..b190a00 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -121,7 +121,7 @@ public interface Preemptor {
private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
@Override public boolean apply(IScheduledTask task) {
- return (clock.nowMillis() - Iterables.getLast(task.getTaskEvents()).getTimestamp())
+ return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
>= preemptionCandidacyDelay.as(Time.MILLISECONDS);
}
};
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index 0265bf9..a9ee32b 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Lists;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Clock;
import com.twitter.common.util.Random;
import org.apache.aurora.gen.ScheduleStatus;
@@ -51,32 +50,31 @@ import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
/**
* Calculates scheduling delays for tasks.
*/
-interface RescheduleCalculator {
+public interface RescheduleCalculator {
/**
- * Gets a timestamp for the task to become eligible for (re)scheduling at scheduler startup.
+ * Calculates the delay, in milliseconds, before the task should be considered eligible for
+ * (re)scheduling at scheduler startup.
*
- * @param task Task to calculate timestamp for.
- * @return Timestamp in msec.
+ * @param task Task to calculate delay for.
+ * @return Delay in msec.
*/
- long getStartupReadyTimeMs(IScheduledTask task);
+ long getStartupScheduleDelayMs(IScheduledTask task);
/**
- * Gets a timestamp for the task to become eligible for (re)scheduling.
+ * Calculates the penalty, in milliseconds, that a task should be penalized before being
+ * eligible for rescheduling.
*
- * @param task Task to calculate timestamp for.
- * @return Timestamp in msec.
+ * @param task Task to calculate delay for.
+ * @return Delay in msec.
*/
- long getReadyTimeMs(IScheduledTask task);
+ long getFlappingPenaltyMs(IScheduledTask task);
- // TODO(wfarner): Create a unit test for this class. It currently piggybacks on
- // TaskSchedulerTest. Once a unit test exists, TaskSchedulerTest should use a mock.
class RescheduleCalculatorImpl implements RescheduleCalculator {
private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
private final Storage storage;
private final RescheduleCalculatorSettings settings;
- private final Clock clock;
// TODO(wfarner): Inject 'random' in the constructor for better test coverage.
private final Random random = new Random.SystemRandom(new java.util.Random());
@@ -138,25 +136,15 @@ interface RescheduleCalculator {
}
@Inject
- RescheduleCalculatorImpl(
- Storage storage,
- RescheduleCalculatorSettings settings,
- Clock clock) {
-
+ RescheduleCalculatorImpl(Storage storage, RescheduleCalculatorSettings settings) {
this.storage = checkNotNull(storage);
this.settings = checkNotNull(settings);
- this.clock = checkNotNull(clock);
}
@Override
- public long getStartupReadyTimeMs(IScheduledTask task) {
- return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS))
- + getTaskReadyTimestamp(task);
- }
-
- @Override
- public long getReadyTimeMs(IScheduledTask task) {
- return getTaskReadyTimestamp(task);
+ public long getStartupScheduleDelayMs(IScheduledTask task) {
+ return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS).intValue())
+ + getFlappingPenaltyMs(task);
}
private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
@@ -170,7 +158,8 @@ interface RescheduleCalculator {
return Optional.fromNullable(Iterables.getOnlyElement(res, null));
}
- private long getTaskReadyTimestamp(IScheduledTask task) {
+ @Override
+ public long getFlappingPenaltyMs(IScheduledTask task) {
Optional<IScheduledTask> curTask = getTaskAncestor(task);
long penaltyMs = 0;
while (curTask.isPresent() && flapped.apply(curTask.get())) {
@@ -185,7 +174,7 @@ interface RescheduleCalculator {
curTask = getTaskAncestor(curTask.get());
}
- return penaltyMs + clock.nowMillis();
+ return penaltyMs;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
index 1a00874..a834f44 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
@@ -115,7 +115,6 @@ class TaskGroup {
return key.toString();
}
- // TODO(zmanji): Return Task instances here. Can use them to display flapping penalty on web UI.
public Set<String> getTaskIds() {
return ImmutableSet.copyOf(Iterables.transform(tasks, TO_TASK_ID));
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index b50c625..702b0df 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -201,8 +201,9 @@ public class TaskGroups implements EventSubscriber {
return executor;
}
- private synchronized void add(IAssignedTask task, long readyTimestamp) {
- groups.getUnchecked(new GroupKey(task.getTask())).push(task.getTaskId(), readyTimestamp);
+ private synchronized void add(IAssignedTask task, long scheduleDelayMs) {
+ groups.getUnchecked(new GroupKey(task.getTask()))
+ .push(task.getTaskId(), clock.nowMillis() + scheduleDelayMs);
}
/**
@@ -218,8 +219,8 @@ public class TaskGroups implements EventSubscriber {
if (stateChange.getNewState() == PENDING) {
IScheduledTask task = stateChange.getTask();
long readyAtMs = stateChange.isTransition()
- ? rescheduleCalculator.getReadyTimeMs(task)
- : rescheduleCalculator.getStartupReadyTimeMs(task);
+ ? rescheduleCalculator.getFlappingPenaltyMs(task)
+ : rescheduleCalculator.getStartupScheduleDelayMs(task);
add(task.getAssignedTask(), readyAtMs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
new file mode 100644
index 0000000..11a01ea
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
@@ -0,0 +1,87 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.stats.SlidingStats;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateManager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+
+/**
+ * A holding area for tasks that have been throttled. Tasks entering the
+ * {@link org.apache.aurora.gen.ScheduleStatus#THROTTLED} state will be transitioned to
+ * {@link org.apache.aurora.gen.ScheduleStatus#PENDING} after the penalty period (as dictated by
+ * {@link RescheduleCalculator} has expired.
+ */
+class TaskThrottler implements EventSubscriber {
+
+ private final RescheduleCalculator rescheduleCalculator;
+ private final Clock clock;
+ private final ScheduledExecutorService executor;
+ private final StateManager stateManager;
+
+ private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms");
+
+ @Inject
+ TaskThrottler(
+ RescheduleCalculator rescheduleCalculator,
+ Clock clock,
+ ScheduledExecutorService executor,
+ StateManager stateManager) {
+
+ this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
+ this.clock = checkNotNull(clock);
+ this.executor = checkNotNull(executor);
+ this.stateManager = checkNotNull(stateManager);
+ }
+
+ @Subscribe
+ public void taskChangedState(final TaskStateChange stateChange) {
+ if (stateChange.getNewState() == THROTTLED) {
+ long readyAtMs = Tasks.getLatestEvent(stateChange.getTask()).getTimestamp()
+ + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
+ long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
+ throttleStats.accumulate(delayMs);
+ executor.schedule(
+ new Runnable() {
+ @Override public void run() {
+ stateManager.changeState(
+ Query.taskScoped(stateChange.getTaskId()).byStatus(THROTTLED),
+ PENDING,
+ Optional.<String>absent());
+ }
+ },
+ delayMs,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index 569e8c3..06a19d8 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -37,6 +37,7 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
/**
* Utility class providing convenience functions relating to tasks.
@@ -177,10 +178,14 @@ public final class Tasks {
return task.getAssignedTask().getTask().getJobName();
}
+ public static ITaskEvent getLatestEvent(IScheduledTask task) {
+ return Iterables.getLast(task.getTaskEvents());
+ }
+
public static final Ordering<IScheduledTask> LATEST_ACTIVITY = Ordering.natural()
.onResultOf(new Function<IScheduledTask, Long>() {
@Override public Long apply(IScheduledTask task) {
- return Iterables.getLast(task.getTaskEvents()).getTimestamp();
+ return getLatestEvent(task).getTimestamp();
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index 59e18ea..2669781 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -222,47 +222,6 @@ public interface PubsubEvent {
}
}
- public static class TaskRescheduled implements PubsubEvent {
- private final String role;
- private final String job;
- private final int instance;
-
- public TaskRescheduled(String role, String job, int instance) {
- this.role = role;
- this.job = job;
- this.instance = instance;
- }
-
- public String getRole() {
- return role;
- }
-
- public String getJob() {
- return job;
- }
-
- public int getInstance() {
- return instance;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof TaskRescheduled)) {
- return false;
- }
-
- TaskRescheduled other = (TaskRescheduled) o;
- return Objects.equal(role, other.role)
- && Objects.equal(job, other.job)
- && Objects.equal(instance, other.instance);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(role, job, instance);
- }
- }
-
public static class DriverRegistered implements PubsubEvent {
@Override
public boolean equals(Object o) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
index 07a648f..1e0904f 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
@@ -17,7 +17,6 @@ package org.apache.aurora.scheduler.http;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -65,7 +64,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
-import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.twitter.common.base.MorePreconditions.checkNotBlank;
@@ -103,24 +101,6 @@ public class SchedulerzJob extends JerseyTemplateServlet {
.put(FAILED, EnumSet.of(LOST, FAILED))
.build();
- private static final Comparator<IScheduledTask> REVERSE_CHRON_COMPARATOR =
- new Comparator<IScheduledTask>() {
- @Override public int compare(IScheduledTask taskA, IScheduledTask taskB) {
- // Sort in reverse chronological order.
- Iterable<ITaskEvent> taskAEvents = taskA.getTaskEvents();
- Iterable<ITaskEvent> taskBEvents = taskB.getTaskEvents();
-
- boolean taskAHasEvents = taskAEvents != null && !Iterables.isEmpty(taskAEvents);
- boolean taskBHasEvents = taskBEvents != null && !Iterables.isEmpty(taskBEvents);
- if (taskAHasEvents && taskBHasEvents) {
- return Long.signum(Iterables.getLast(taskBEvents).getTimestamp()
- - Iterables.getLast(taskAEvents).getTimestamp());
- } else {
- return 0;
- }
- }
- };
-
private static final Function<Veto, String> GET_REASON = new Function<Veto, String>() {
@Override public String apply(Veto veto) {
return veto.getReason();
@@ -165,7 +145,7 @@ public class SchedulerzJob extends JerseyTemplateServlet {
.put("instanceId", task.getInstanceId())
.put("slaveHost", task.isSetSlaveHost() ? task.getSlaveHost() : "")
.put("status", scheduledTask.getStatus())
- .put("statusTimestamp", Iterables.getLast(scheduledTask.getTaskEvents()).getTimestamp())
+ .put("statusTimestamp", Tasks.getLatestEvent(scheduledTask).getTimestamp())
.put("taskEvents", scheduledTask.getTaskEvents());
if (scheduledTask.getStatus() == ScheduleStatus.PENDING) {
@@ -411,7 +391,7 @@ public class SchedulerzJob extends JerseyTemplateServlet {
if (completedQuery.isPresent()) {
List<IScheduledTask> completedTasks = Lists.newArrayList(
Storage.Util.weaklyConsistentFetchTasks(storage, completedQuery.get()));
- Collections.sort(completedTasks, REVERSE_CHRON_COMPARATOR);
+ Collections.sort(completedTasks, Tasks.LATEST_ACTIVITY.reverse());
template.setAttribute("completedTasks",
ImmutableList.copyOf(
Iterables.transform(offsetAndLimit(completedTasks, offset), taskToStringMap)));
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
index cef0ff2..785efd0 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
@@ -294,9 +294,7 @@ public class SchedulerzRole extends JerseyTemplateServlet {
case UNKNOWN:
job.failedTaskCount++;
Date now = new Date();
- long elapsedMillis = now.getTime()
- - Iterables.getLast(task.getTaskEvents()).getTimestamp();
-
+ long elapsedMillis = now.getTime() - Tasks.getLatestEvent(task).getTimestamp();
if (Amount.of(elapsedMillis, Time.MILLISECONDS).as(Time.HOURS) < 6) {
job.recentlyFailedTaskCount++;
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index b6dd537..256f830 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -46,6 +46,7 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
@@ -68,6 +69,7 @@ import static com.twitter.common.base.MorePreconditions.checkNotBlank;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
@@ -82,14 +84,11 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
public class StateManagerImpl implements StateManager {
private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
- private final SideEffectStorage storage;
@VisibleForTesting
SideEffectStorage getStorage() {
return storage;
}
- private final TaskIdGenerator taskIdGenerator;
-
// Work queue to receive state machine side effect work.
// Items are sorted to place DELETE entries last. This is to ensure that within an operation,
// a delete is always processed after a state transition.
@@ -129,8 +128,11 @@ public class StateManagerImpl implements StateManager {
}
};
- private final Driver driver;
+ private final SideEffectStorage storage;
private final Clock clock;
+ private final Driver driver;
+ private final TaskIdGenerator taskIdGenerator;
+ private final RescheduleCalculator rescheduleCalculator;
/**
* An item of work on the work queue.
@@ -157,7 +159,8 @@ public class StateManagerImpl implements StateManager {
final Clock clock,
Driver driver,
TaskIdGenerator taskIdGenerator,
- EventSink eventSink) {
+ EventSink eventSink,
+ RescheduleCalculator rescheduleCalculator) {
checkNotNull(storage);
this.clock = checkNotNull(clock);
@@ -171,6 +174,7 @@ public class StateManagerImpl implements StateManager {
this.storage = new SideEffectStorage(storage, finalizer, eventSink);
this.driver = checkNotNull(driver);
this.taskIdGenerator = checkNotNull(taskIdGenerator);
+ this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
Stats.exportSize("work_queue_depth", workQueue);
}
@@ -334,8 +338,9 @@ public class StateManagerImpl implements StateManager {
switch (work.command) {
case RESCHEDULE:
- ScheduledTask builder =
- Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
+ IScheduledTask ancestor = Iterables.getOnlyElement(taskStore.fetchTasks(idQuery));
+
+ ScheduledTask builder = ancestor.newBuilder();
builder.getAssignedTask().unsetSlaveId();
builder.getAssignedTask().unsetSlaveHost();
builder.getAssignedTask().unsetAssignedPorts();
@@ -351,13 +356,19 @@ public class StateManagerImpl implements StateManager {
IScheduledTask task = IScheduledTask.build(builder);
taskStore.saveTasks(ImmutableSet.of(task));
- createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
- ITaskConfig taskInfo = task.getAssignedTask().getTask();
- sideEffectWork.addTaskEvent(
- new PubsubEvent.TaskRescheduled(
- taskInfo.getOwner().getRole(),
- taskInfo.getJobName(),
- task.getAssignedTask().getInstanceId()));
+ ScheduleStatus newState;
+ String auditMessage;
+ long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(ancestor);
+ if (flapPenaltyMs > 0) {
+ newState = THROTTLED;
+ auditMessage =
+ String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
+ } else {
+ newState = PENDING;
+ auditMessage = "Rescheduled";
+ }
+
+ createStateMachine(task).updateState(newState, Optional.of(auditMessage));
break;
case UPDATE_STATE:
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index d0f88e5..11d283d 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -77,6 +77,7 @@ class TaskStateMachine {
private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
private static final State STARTING = State.create(ScheduleStatus.STARTING);
+ private static final State THROTTLED = State.create(ScheduleStatus.THROTTLED);
private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
@VisibleForTesting
@@ -263,29 +264,23 @@ class TaskStateMachine {
}
};
+ final Closure<Transition<State>> deleteIfKilling =
+ Closures.filter(Transition.to(KILLING), addWorkClosure(WorkCommand.DELETE));
+
stateMachine = StateMachine.<State>builder(taskId)
.logTransitions()
.initialState(State.create(initialState))
.addState(
Rule.from(INIT)
- .to(PENDING, UNKNOWN))
+ .to(PENDING, THROTTLED, UNKNOWN))
.addState(
Rule.from(PENDING)
.to(ASSIGNED, KILLING)
- .withCallback(
- new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> transition) {
- switch (transition.getTo().getState()) {
- case KILLING:
- addWork(WorkCommand.DELETE);
- break;
-
- default:
- // No-op.
- }
- }
- }
- ))
+ .withCallback(deleteIfKilling))
+ .addState(
+ Rule.from(THROTTLED)
+ .to(PENDING, KILLING)
+ .withCallback(deleteIfKilling))
.addState(
Rule.from(ASSIGNED)
.to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
@@ -552,7 +547,7 @@ class TaskStateMachine {
* @param mutation Mutate operation to perform while updating the task.
* @return {@code true} if the state change was allowed, {@code false} otherwise.
*/
- public synchronized boolean updateState(
+ private synchronized boolean updateState(
final ScheduleStatus status,
Function<IScheduledTask, IScheduledTask> mutation,
final Optional<String> auditMessage) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
index 69374ca..3ce5bd3 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
@@ -101,24 +101,6 @@ public final class StorageBackfill {
}
}
- private static final AtomicLong BOTH_FIELDS_SET = Stats.exportLong("both_instance_ids_set");
- private static final AtomicLong OLD_FIELD_SET = Stats.exportLong("old_instance_id_set");
- private static final AtomicLong NEW_FIELD_SET = Stats.exportLong("new_instance_id_set");
- private static final AtomicLong FIELDS_INCONSISTENT =
- Stats.exportLong("instance_ids_inconsistent");
-
- /**
- * Ensures backwards-compatibility of the throttled state, which exists in this version but is
- * not handled.
- *
- * @param task Task to possibly rewrite.
- */
- private static void rewriteThrottledState(ScheduledTask task) {
- if (ScheduleStatus.THROTTLED == task.getStatus()) {
- task.setStatus(ScheduleStatus.PENDING);
- }
- }
-
/**
* Backfills the storage to make it match any assumptions that may have changed since
* the structs were first written.
@@ -137,7 +119,6 @@ public final class StorageBackfill {
// TODO(ksweeney): Guarantee tasks pass current validation code here and quarantine if they
// don't.
guaranteeShardUniqueness(builder, storeProvider.getUnsafeTaskStore(), clock);
- rewriteThrottledState(builder);
return IScheduledTask.build(builder);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index 33c70df..927552a 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -256,7 +256,8 @@ enum ScheduleStatus {
}
// States that a task may be in while still considered active.
-const set<ScheduleStatus> ACTIVE_STATES = [ScheduleStatus.PENDING,
+const set<ScheduleStatus> ACTIVE_STATES = [ScheduleStatus.THROTTLED,
+ ScheduleStatus.PENDING,
ScheduleStatus.ASSIGNED,
ScheduleStatus.STARTING,
ScheduleStatus.RUNNING,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
new file mode 100644
index 0000000..c450276
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2014 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class RescheduleCalculatorImplTest extends EasyMockTest {
+
+ private static final Amount<Long, Time> FLAPPING_THRESHOLD = Amount.of(1L, Time.MINUTES);
+ private static final Amount<Integer, Time> MAX_STARTUP_DELAY = Amount.of(10, Time.MINUTES);
+
+ private StorageTestUtil storageUtil;
+ private BackoffStrategy backoff;
+ private RescheduleCalculator rescheduleCalculator;
+
+ @Before
+ public void setUp() {
+ storageUtil = new StorageTestUtil(this);
+ backoff = createMock(BackoffStrategy.class);
+ rescheduleCalculator = new RescheduleCalculatorImpl(
+ storageUtil.storage,
+ new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+ backoff,
+ FLAPPING_THRESHOLD,
+ MAX_STARTUP_DELAY));
+ storageUtil.expectOperations();
+ }
+
+ @Test
+ public void testNoPenaltyForNoAncestor() {
+ control.replay();
+
+ assertEquals(0L, rescheduleCalculator.getFlappingPenaltyMs(makeTask("a", INIT)));
+ }
+
+ @Test
+ public void testNoPenaltyDeletedAncestor() {
+ String ancestorId = "a";
+ storageUtil.expectTaskFetch(Query.taskScoped(ancestorId));
+
+ control.replay();
+
+ assertEquals(
+ 0L,
+ rescheduleCalculator.getFlappingPenaltyMs(setAncestor(makeTask("b", INIT), ancestorId)));
+ }
+
+ @Test
+ public void testFlappingTask() {
+ IScheduledTask ancestor = makeFlappyTask("a");
+ storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+ long penaltyMs = 1000L;
+ expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs);
+
+ control.replay();
+
+ assertEquals(
+ penaltyMs,
+ rescheduleCalculator.getFlappingPenaltyMs(
+ setAncestor(makeTask("b", INIT), Tasks.id(ancestor))));
+ }
+
+ @Test
+ public void testFlappingTasksBackoffTruncation() {
+ // Ensures that the reschedule calculator detects penalty truncation and avoids inspecting
+ // ancestors once truncated.
+ IScheduledTask taskA = setAncestor(makeFlappyTask("a"), "bugIfQueried");
+ IScheduledTask taskB = setAncestor(makeFlappyTask("b"), Tasks.id(taskA));
+ IScheduledTask taskC = setAncestor(makeFlappyTask("c"), Tasks.id(taskB));
+ IScheduledTask taskD = setAncestor(makeFlappyTask("d"), Tasks.id(taskC));
+
+ Map<IScheduledTask, Long> ancestorsAndPenalties = ImmutableMap.of(
+ taskD, 100L,
+ taskC, 200L,
+ taskB, 300L,
+ taskA, 300L);
+
+ long lastPenalty = 0L;
+ for (Map.Entry<IScheduledTask, Long> taskAndPenalty : ancestorsAndPenalties.entrySet()) {
+ storageUtil.expectTaskFetch(
+ Query.taskScoped(Tasks.id(taskAndPenalty.getKey())),
+ taskAndPenalty.getKey());
+ expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue());
+ lastPenalty = taskAndPenalty.getValue();
+ }
+
+ control.replay();
+
+ IScheduledTask newTask = setAncestor(makeFlappyTask("newTask"), Tasks.id(taskD));
+ assertEquals(300L, rescheduleCalculator.getFlappingPenaltyMs(newTask));
+ }
+
+ @Test
+ public void testNoPenaltyForInterruptedTasks() {
+ IScheduledTask ancestor = setEvents(
+ makeTask("a", KILLED),
+ ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L));
+ storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+
+ control.replay();
+
+ assertEquals(
+ 0L,
+ rescheduleCalculator.getFlappingPenaltyMs(
+ setAncestor(makeTask("b", INIT), Tasks.id(ancestor))));
+ }
+
+ private IScheduledTask makeFlappyTask(String taskId) {
+ return setEvents(
+ makeTask(taskId, FINISHED),
+ ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, FINISHED, 300L));
+ }
+
+ private IScheduledTask makeTask(String taskId) {
+ return IScheduledTask.build(new ScheduledTask()
+ .setAssignedTask(new AssignedTask()
+ .setInstanceId(0)
+ .setTaskId(taskId)
+ .setTask(new TaskConfig()
+ .setJobName("job-" + taskId)
+ .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId))
+ .setEnvironment("env-" + taskId))));
+ }
+
+ private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
+ return IScheduledTask.build(makeTask(taskId).newBuilder().setStatus(status));
+ }
+
+ private IScheduledTask setAncestor(IScheduledTask task, String ancestorId) {
+ return IScheduledTask.build(task.newBuilder().setAncestorId(ancestorId));
+ }
+
+ private static final Function<Map.Entry<ScheduleStatus, Long>, TaskEvent> TO_EVENT =
+ new Function<Entry<ScheduleStatus, Long>, TaskEvent>() {
+ @Override public TaskEvent apply(Entry<ScheduleStatus, Long> input) {
+ return new TaskEvent().setStatus(input.getKey()).setTimestamp(input.getValue());
+ }
+ };
+
+ private IScheduledTask setEvents(IScheduledTask task, Map<ScheduleStatus, Long> events) {
+ return IScheduledTask.build(task.newBuilder().setTaskEvents(
+ FluentIterable.from(events.entrySet()).transform(TO_EVENT).toList()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 9698f28..4dfac03 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -15,13 +15,10 @@
*/
package org.apache.aurora.scheduler.async;
-import java.util.EnumSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -40,7 +37,6 @@ import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
@@ -71,14 +67,11 @@ import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
@@ -148,13 +141,13 @@ public class TaskSchedulerTest extends EasyMockTest {
rateLimiter,
scheduler,
clock,
+ // TODO(wfarner): Use a mock rather than impl here.
new RescheduleCalculatorImpl(
storage,
new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
flappingStrategy,
flappingThreshold,
- Amount.of(5, Time.SECONDS)),
- clock));
+ Amount.of(5, Time.SECONDS))));
}
private Capture<Runnable> expectOffer() {
@@ -590,144 +583,6 @@ public class TaskSchedulerTest extends EasyMockTest {
timeoutCapture.getValue().run();
}
- @Test
- public void testNoPenaltyForNoAncestor() {
- // If a task doesn't have an ancestor there should be no penality for flapping.
- expectAnyMaintenanceCalls();
- IScheduledTask task = makeTask("a1", INIT);
-
- expectOfferDeclineIn(10);
- Capture<Runnable> first = expectTaskGroupBackoff(1);
- expectTaskScheduled(task);
-
- replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
-
- changeState(task, INIT, PENDING);
-
- first.getValue().run();
- }
-
- @Test
- public void testFlappingTasksBackoffTruncation() {
- expectAnyMaintenanceCalls();
-
- makeFlappyTask("a0", null);
- makeFlappyTask("a1", "a0");
- makeFlappyTask("a2", "a1");
- IScheduledTask taskA3 = IScheduledTask.build(makeTask("a3", INIT).newBuilder()
- .setAncestorId("a2"));
-
- expectOfferDeclineIn(10);
-
- Capture<Runnable> first = expectTaskGroupBackoff(10);
- // The ancestry chain is 3 long, but if the backoff strategy truncates, we don't traverse the
- // entire history.
- expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
- expect(flappingStrategy.calculateBackoffMs(5L)).andReturn(5L);
- Capture<Runnable> flapping = expectTaskRetryIn(10);
-
- expectTaskScheduled(taskA3);
-
- replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A);
-
- changeState(taskA3, INIT, PENDING);
-
- first.getValue().run();
- clock.waitFor(10);
- flapping.getValue().run();
- }
-
- @Test
- public void testFlappingTasks() {
- expectAnyMaintenanceCalls();
-
- makeFlappyTask("a0", null);
- IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
- .setAncestorId("a0"));
-
- expectOfferDeclineIn(10);
- Capture<Runnable> first = expectTaskGroupBackoff(10);
-
- expect(flappingStrategy.calculateBackoffMs(0)).andReturn(5L);
- // Since A1 has been penalized, the task has to wait for another 10 ms until the penalty has
- // expired.
- Capture<Runnable> flapping = expectTaskRetryIn(10);
-
- expectTaskScheduled(taskA1);
-
- replayAndCreateScheduler();
-
- offerQueue.addOffer(OFFER_A);
-
- changeState(taskA1, INIT, PENDING);
-
- first.getValue().run();
- clock.waitFor(10);
- flapping.getValue().run();
- }
-
- @Test
- public void testNoPenaltyForInterruptedTasks() {
- expectAnyMaintenanceCalls();
-
- makeFlappyTaskWithStates("a0", EnumSet.of(INIT, PENDING, ASSIGNED, RESTARTING, FAILED), null);
- IScheduledTask taskA1 = IScheduledTask.build(makeTask("a1", INIT).newBuilder()
- .setAncestorId("a0"));
-
- expectOfferDeclineIn(10);
- Capture<Runnable> first = expectTaskGroupBackoff(10);
-
- expectTaskScheduled(taskA1);
-
- replayAndCreateScheduler();
-
- offerQueue.addOffer(OFFER_A);
-
- changeState(taskA1, INIT, PENDING);
-
- first.getValue().run();
- }
-
- private IScheduledTask makeFlappyTaskWithStates(
- String taskId,
- Iterable<ScheduleStatus> states,
- @Nullable String ancestorId) {
-
- Amount<Long, Time> timeInState = Amount.of(10L, Time.SECONDS);
-
- ScheduledTask base = makeTask(taskId, INIT).newBuilder();
-
- for (ScheduleStatus status : states) {
- base.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
- clock.advance(timeInState);
- }
-
- base.setAncestorId(ancestorId);
-
- final IScheduledTask result = IScheduledTask.build(base);
-
- // Insert the task if it doesn't already exist.
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- if (taskStore.fetchTasks(Query.taskScoped(Tasks.id(result))).isEmpty()) {
- taskStore.saveTasks(ImmutableSet.of(result));
- }
- }
- });
-
- return result;
- }
-
- private IScheduledTask makeFlappyTask(String taskId, @Nullable String ancestorId) {
- return makeFlappyTaskWithStates(
- taskId,
- EnumSet.of(INIT, PENDING, ASSIGNED, RUNNING, FAILED),
- ancestorId);
- }
-
private TaskInfo makeTaskInfo(IScheduledTask task) {
return TaskInfo.newBuilder()
.setName(Tasks.id(task))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
new file mode 100644
index 0000000..66bc2a0
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+
+public class TaskThrottlerTest extends EasyMockTest {
+
+ private RescheduleCalculator rescheduleCalculator;
+ private FakeClock clock;
+ private ScheduledExecutorService executor;
+ private StateManager stateManager;
+ private TaskThrottler throttler;
+
+ @Before
+ public void setUp() throws Exception {
+ rescheduleCalculator = createMock(RescheduleCalculator.class);
+ clock = new FakeClock();
+ executor = createMock(ScheduledExecutorService.class);
+ stateManager = createMock(StateManager.class);
+ throttler = new TaskThrottler(rescheduleCalculator, clock, executor, stateManager);
+ }
+
+ @Test
+ public void testIgnoresNonThrottledTasks() {
+ control.replay();
+
+ throttler.taskChangedState(TaskStateChange.transition(makeTask("a", PENDING), INIT));
+ throttler.taskChangedState(TaskStateChange.transition(makeTask("a", RUNNING), PENDING));
+ }
+
+ @Test
+ public void testThrottledTask() {
+ IScheduledTask task = makeTask("a", THROTTLED);
+
+ long penaltyMs = 100;
+
+ expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs);
+ Capture<Runnable> stateChangeCapture = expectThrottled(penaltyMs);
+ expectMovedToPending(task);
+
+ control.replay();
+
+ throttler.taskChangedState(TaskStateChange.transition(task, INIT));
+ stateChangeCapture.getValue().run();
+ }
+
+ @Test
+ public void testThrottledTaskReady() {
+ // Ensures that a sane delay is used when the task's penalty was already expired when
+ // the -> THROTTLED transition occurred (such as in the event of a scheduler failover).
+
+ IScheduledTask task = makeTask("a", THROTTLED);
+
+ long penaltyMs = 100;
+
+ expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs);
+ Capture<Runnable> stateChangeCapture = expectThrottled(0);
+ expectMovedToPending(task);
+
+ control.replay();
+
+ clock.advance(Amount.of(1L, Time.HOURS));
+ throttler.taskChangedState(TaskStateChange.transition(task, INIT));
+ stateChangeCapture.getValue().run();
+ }
+
+ private Capture<Runnable> expectThrottled(long penaltyMs) {
+ Capture<Runnable> stateChangeCapture = createCapture();
+ expect(executor.schedule(
+ capture(stateChangeCapture),
+ eq(penaltyMs),
+ eq(TimeUnit.MILLISECONDS)))
+ .andReturn(null);
+ return stateChangeCapture;
+ }
+
+ private void expectMovedToPending(IScheduledTask task) {
+ expect(stateManager.changeState(
+ Query.taskScoped(Tasks.id(task)).byStatus(THROTTLED),
+ PENDING,
+ Optional.<String>absent()))
+ .andReturn(1);
+ }
+
+ private IScheduledTask makeTask(String id, ScheduleStatus status) {
+ return IScheduledTask.build(new ScheduledTask()
+ .setTaskEvents(ImmutableList.of(
+ new TaskEvent()
+ .setStatus(status)
+ .setTimestamp(clock.nowMillis())))
+ .setStatus(status)
+ .setAssignedTask(new AssignedTask().setTaskId(id)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 720d0c8..cc929f8 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -59,6 +59,7 @@ import org.apache.aurora.gen.TaskQuery;
import org.apache.aurora.gen.ValueConstraint;
import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.ScheduleException;
@@ -75,7 +76,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.StorageBackfill;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IIdentity;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -83,6 +83,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
import org.apache.mesos.Protos.SlaveID;
import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
@@ -137,6 +138,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private CronJobManager cron;
private FakeClock clock;
private EventSink eventSink;
+ private RescheduleCalculator rescheduleCalculator;
private ShutdownRegistry shutdownRegistry;
private JobFilter jobFilter;
@@ -154,6 +156,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
clock = new FakeClock();
eventSink = createMock(EventSink.class);
eventSink.post(EasyMock.<PubsubEvent>anyObject());
+ rescheduleCalculator = createMock(RescheduleCalculator.class);
cronScheduler = createMock(CronScheduler.class);
shutdownRegistry = createMock(ShutdownRegistry.class);
jobFilter = createMock(JobFilter.class);
@@ -184,12 +187,19 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private void buildScheduler(Storage newStorage) throws Exception {
this.storage = newStorage;
storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
StorageBackfill.backfill(storeProvider, clock);
}
});
- stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
+ stateManager = new StateManagerImpl(
+ storage,
+ clock,
+ driver,
+ taskIdGenerator,
+ eventSink,
+ rescheduleCalculator);
ImmediateJobManager immediateManager = new ImmediateJobManager(stateManager, storage);
cron = new CronJobManager(stateManager, storage, cronScheduler, shutdownRegistry);
scheduler = new SchedulerCoreImpl(
@@ -412,16 +422,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
}
}
- @Test
- public void testSortableTaskIds() throws Exception {
- control.replay();
- buildScheduler();
-
- for (IScheduledTask task : getTasks(Query.unscoped())) {
- assertEquals(IIdentity.build(OWNER_A), task.getAssignedTask().getTask().getOwner());
- }
- }
-
@Test(expected = ScheduleException.class)
public void testCreateDuplicateJob() throws Exception {
control.replay();
@@ -577,7 +577,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
assertTaskCount(1);
assertEquals(PENDING, getTask(taskId).getStatus());
- changeStatus(Query.taskScoped(taskId), ASSIGNED);
+ changeStatus(taskId, ASSIGNED);
scheduler.startCronJob(KEY_A);
assertTaskCount(2);
@@ -649,20 +649,28 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
}
}
+ private IExpectationSetters<Long> expectTaskNotThrottled() {
+ return expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+ .andReturn(0L);
+ }
+
@Test
public void testServiceTasksRescheduled() throws Exception {
+ int numServiceTasks = 5;
+
+ expectTaskNotThrottled().times(numServiceTasks);
+
control.replay();
buildScheduler();
// Schedule 5 service and 5 non-service tasks.
- scheduler.createJob(makeJob(KEY_A, 5));
+ scheduler.createJob(makeJob(KEY_A, numServiceTasks));
TaskConfig task = productionTask().setIsService(true);
scheduler.createJob(
makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
assertEquals(10, getTasksByStatus(PENDING).size());
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
- changeStatus(Query.roleScoped(ROLE_A), STARTING);
+ changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
assertEquals(10, getTasksByStatus(STARTING).size());
changeStatus(Query.roleScoped(ROLE_A), RUNNING);
@@ -683,11 +691,14 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testServiceTaskIgnoresMaxFailures() throws Exception {
+ int totalFailures = 10;
+
+ expectTaskNotThrottled().times(totalFailures);
+
control.replay();
buildScheduler();
int maxFailures = 5;
- int totalFailures = 10;
// Schedule a service task.
TaskConfig task = productionTask()
@@ -701,9 +712,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
String taskId = Tasks.id(
getOnlyTask(Query.jobScoped(KEY_A).active()));
- changeStatus(taskId, ASSIGNED);
- changeStatus(taskId, STARTING);
- changeStatus(taskId, RUNNING);
+ changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
assertEquals(i - 1, getTask(taskId).getFailureCount());
changeStatus(taskId, FAILED);
@@ -718,27 +727,26 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testTaskRescheduleOnKill() throws Exception {
+ int numServiceTasks = 5;
+
+ expectTaskNotThrottled().times(numServiceTasks);
+
control.replay();
buildScheduler();
- // Create 5 non-service and 5 service tasks.
- scheduler.createJob(makeJob(KEY_A, 5));
- TaskConfig task = productionTask().setIsService(true);
- scheduler.createJob(
- makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
+ scheduler.createJob(makeJob(KEY_A, numServiceTasks));
- assertEquals(10, getTasksByStatus(PENDING).size());
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
- changeStatus(Query.roleScoped(ROLE_A), STARTING);
- assertEquals(10, getTasksByStatus(STARTING).size());
+ assertEquals(5, getTasksByStatus(PENDING).size());
+ changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
+ assertEquals(5, getTasksByStatus(STARTING).size());
changeStatus(Query.roleScoped(ROLE_A), RUNNING);
- assertEquals(10, getTasksByStatus(RUNNING).size());
+ assertEquals(5, getTasksByStatus(RUNNING).size());
// All tasks will move back into PENDING state after getting KILLED.
changeStatus(Query.roleScoped(ROLE_A), KILLED);
Set<IScheduledTask> newTasks = getTasksByStatus(PENDING);
- assertEquals(10, newTasks.size());
- assertEquals(10, getTasksByStatus(KILLED).size());
+ assertEquals(5, newTasks.size());
+ assertEquals(5, getTasksByStatus(KILLED).size());
}
@Test
@@ -749,9 +757,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
buildScheduler();
scheduler.createJob(makeJob(KEY_A, 1));
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
- changeStatus(Query.roleScoped(ROLE_A), STARTING);
- changeStatus(Query.roleScoped(ROLE_A), RUNNING);
+ changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING, RUNNING);
scheduler.killTasks(Query.roleScoped(ROLE_A), OWNER_A.getUser());
changeStatus(Query.roleScoped(ROLE_A), KILLED);
@@ -765,6 +771,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testFailedTaskIncrementsFailureCount() throws Exception {
int maxFailures = 5;
+ expectTaskNotThrottled().times(maxFailures - 1);
+
control.replay();
buildScheduler();
@@ -778,9 +786,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
String taskId = Tasks.id(getOnlyTask(
Query.jobScoped(KEY_A).active()));
- changeStatus(taskId, ASSIGNED);
- changeStatus(taskId, STARTING);
- changeStatus(taskId, RUNNING);
+ changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
assertEquals(i - 1, getTask(taskId).getFailureCount());
changeStatus(taskId, FAILED);
@@ -819,8 +825,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
assertTaskCount(10);
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED);
- changeStatus(Query.roleScoped(ROLE_A), STARTING);
+ changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
assertTaskCount(10);
changeStatus(Query.roleScoped(ROLE_A), RUNNING);
assertTaskCount(10);
@@ -895,9 +900,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.createJob(makeJob(KEY_A, 1));
String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
- changeStatus(taskId, ASSIGNED);
- changeStatus(taskId, STARTING);
- changeStatus(taskId, RUNNING);
+ changeStatus(taskId, ASSIGNED, STARTING, RUNNING);
scheduler.killTasks(Query.taskScoped(taskId), OWNER_A.getUser());
assertEquals(KILLING, getTask(taskId).getStatus());
assertEquals(1, getTasks(Query.roleScoped(ROLE_A)).size());
@@ -925,29 +928,25 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testLostTaskRescheduled() throws Exception {
expectKillTask(2);
+ expectTaskNotThrottled().times(2);
control.replay();
buildScheduler();
- int maxFailures = 5;
- TaskConfig task = productionTask().setMaxTaskFailures(maxFailures);
- scheduler.createJob(makeJob(KEY_A, task, 1));
+ scheduler.createJob(makeJob(KEY_A, 1));
assertTaskCount(1);
Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.jobScoped(KEY_A));
+ String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
assertEquals(1, tasks.size());
- changeStatus(Query.unscoped().byStatus(PENDING), ASSIGNED);
+ changeStatus(taskId, ASSIGNED, LOST);
- Query.Builder pendingQuery = Query.unscoped().byStatus(PENDING);
- changeStatus(Query.unscoped().byStatus(ASSIGNED), LOST);
- assertEquals(PENDING, getOnlyTask(pendingQuery).getStatus());
- assertTaskCount(2);
+ String newTaskId = Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)));
+ assertFalse(newTaskId.equals(taskId));
- changeStatus(Query.unscoped().byStatus(PENDING), ASSIGNED);
- changeStatus(Query.unscoped().byStatus(ASSIGNED), LOST);
- assertEquals(PENDING, getOnlyTask(pendingQuery).getStatus());
- assertTaskCount(3);
+ changeStatus(newTaskId, ASSIGNED, LOST);
+ assertFalse(newTaskId.equals(Tasks.id(getOnlyTask(Query.unscoped().byStatus(PENDING)))));
}
@Test
@@ -1017,6 +1016,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testSlaveDeletesTasks() throws Exception {
+ expectTaskNotThrottled();
+
control.replay();
buildScheduler();
@@ -1029,10 +1030,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
assignTask(taskId1, SLAVE_ID, SLAVE_HOST_1);
assignTask(taskId2, SLAVE_ID, SLAVE_HOST_1);
- changeStatus(taskId1, STARTING);
- changeStatus(taskId1, RUNNING);
- changeStatus(taskId2, STARTING);
- changeStatus(taskId2, FINISHED);
+ changeStatus(taskId1, STARTING, RUNNING);
+ changeStatus(taskId2, STARTING, FINISHED);
scheduler.tasksDeleted(ImmutableSet.of(taskId1, taskId2));
@@ -1049,13 +1048,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testRestartShards() throws Exception {
expectKillTask(2);
+ expectTaskNotThrottled().times(2);
control.replay();
buildScheduler();
scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 6));
- changeStatus(Query.jobScoped(KEY_A), ASSIGNED);
- changeStatus(Query.jobScoped(KEY_A), RUNNING);
+ changeStatus(Query.jobScoped(KEY_A), ASSIGNED, RUNNING);
scheduler.restartShards(KEY_A, ImmutableSet.of(1, 5), OWNER_A.user);
assertEquals(4, getTasks(Query.unscoped().byStatus(RUNNING)).size());
assertEquals(2, getTasks(Query.unscoped().byStatus(RESTARTING)).size());
@@ -1065,12 +1064,13 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test(expected = ScheduleException.class)
public void testRestartNonexistentShard() throws Exception {
+ expectTaskNotThrottled();
+
control.replay();
buildScheduler();
scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 1));
- changeStatus(Query.jobScoped(KEY_A), ASSIGNED);
- changeStatus(Query.jobScoped(KEY_A), FINISHED);
+ changeStatus(Query.jobScoped(KEY_A), ASSIGNED, FINISHED);
scheduler.restartShards(KEY_A, ImmutableSet.of(5), OWNER_A.user);
}
@@ -1107,6 +1107,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testPortResourceResetAfterReschedule() throws Exception {
expectKillTask(1);
+ expectTaskNotThrottled();
control.replay();
buildScheduler();
@@ -1139,8 +1140,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.createJob(makeJob(KEY_A, 1));
String taskId = Tasks.id(getOnlyTask(Query.roleScoped(ROLE_A)));
- changeStatus(taskId, ASSIGNED);
- changeStatus(taskId, STARTING);
+ changeStatus(taskId, ASSIGNED, STARTING);
changeStatus(taskId, FAILED, Optional.of("bad stuff happened"));
String hostname = getLocalHost();
@@ -1456,12 +1456,16 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.setTaskStatus(query, status, message);
}
- public void changeStatus(Query.Builder query, ScheduleStatus status) {
- changeStatus(query, status, Optional.<String>absent());
+ public void changeStatus(Query.Builder query, ScheduleStatus status, ScheduleStatus... statuses) {
+ for (ScheduleStatus nextStatus
+ : ImmutableList.<ScheduleStatus>builder().add(status).add(statuses).build()) {
+
+ changeStatus(query, nextStatus, Optional.<String>absent());
+ }
}
- public void changeStatus(String taskId, ScheduleStatus status) {
- changeStatus(taskId, status, Optional.<String>absent());
+ public void changeStatus(String taskId, ScheduleStatus status, ScheduleStatus... statuses) {
+ changeStatus(Query.taskScoped(taskId), status, statuses);
}
public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index b17b983..74ec74f 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -36,6 +36,7 @@ import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
@@ -55,10 +56,12 @@ import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.KILLING;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
import static org.easymock.EasyMock.expect;
@@ -75,6 +78,7 @@ public class StateManagerImplTest extends EasyMockTest {
private Driver driver;
private TaskIdGenerator taskIdGenerator;
private EventSink eventSink;
+ private RescheduleCalculator rescheduleCalculator;
private StateManagerImpl stateManager;
private final FakeClock clock = new FakeClock();
private Storage storage;
@@ -84,9 +88,16 @@ public class StateManagerImplTest extends EasyMockTest {
taskIdGenerator = createMock(TaskIdGenerator.class);
driver = createMock(Driver.class);
eventSink = createMock(EventSink.class);
+ rescheduleCalculator = createMock(RescheduleCalculator.class);
// TODO(William Farner): Use a mocked storage.
storage = MemStorage.newEmptyStorage();
- stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
+ stateManager = new StateManagerImpl(
+ storage,
+ clock,
+ driver,
+ taskIdGenerator,
+ eventSink,
+ rescheduleCalculator);
}
@After
@@ -224,7 +235,8 @@ public class StateManagerImplTest extends EasyMockTest {
// Trigger an event that produces a side-effect and a PubSub event .
eventSink.post(matchStateChange(id, INIT, PENDING));
expectLastCall().andAnswer(new IAnswer<Void>() {
- @Override public Void answer() throws Throwable {
+ @Override
+ public Void answer() throws Throwable {
stateManager.changeState(
Query.unscoped(), ScheduleStatus.ASSIGNED, Optional.<String>absent());
return null;
@@ -253,6 +265,26 @@ public class StateManagerImplTest extends EasyMockTest {
stateManager.deleteTasks(ImmutableSet.of(taskId));
}
+ @Test
+ public void testThrottleTask() {
+ ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+ String taskId = "a";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+ expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+ String newTaskId = "b";
+ expect(taskIdGenerator.generate(task, 0)).andReturn(newTaskId);
+ expect(rescheduleCalculator.getFlappingPenaltyMs(EasyMock.<IScheduledTask>anyObject()))
+ .andReturn(100L);
+ expectStateTransitions(newTaskId, INIT, THROTTLED);
+
+ control.replay();
+
+ insertTask(task, 0);
+ changeState(taskId, ASSIGNED);
+ changeState(taskId, RUNNING);
+ changeState(taskId, FAILED);
+ }
+
private void expectStateTransitions(
String taskId,
ScheduleStatus initial,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
index e89e60a..f44ee58 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
@@ -46,12 +46,14 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
import static org.apache.aurora.scheduler.state.WorkCommand.DELETE;
import static org.apache.aurora.scheduler.state.WorkCommand.INCREMENT_FAILURES;
import static org.apache.aurora.scheduler.state.WorkCommand.KILL;
import static org.apache.aurora.scheduler.state.WorkCommand.RESCHEDULE;
import static org.apache.aurora.scheduler.state.WorkCommand.UPDATE_STATE;
+
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expectLastCall;
import static org.hamcrest.CoreMatchers.is;
@@ -315,6 +317,15 @@ public class TaskStateMachineTest extends EasyMockTest {
transition(stateMachine, PENDING, ASSIGNED, STARTING, RUNNING, KILLING, KILLED);
}
+ @Test
+ public void testThrottledTask() {
+ expectWork(UPDATE_STATE).times(2);
+
+ control.replay();
+
+ transition(stateMachine, THROTTLED, PENDING);
+ }
+
private static void transition(TaskStateMachine stateMachine, ScheduleStatus... states) {
for (ScheduleStatus status : states) {
stateMachine.updateState(status);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a584410c/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
deleted file mode 100644
index 724188b..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Constraint;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.LimitConstraint;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskConstraint;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class StorageBackfillTest {
-
- private Storage storage;
- private FakeClock clock;
-
- @Before
- public void setUp() {
- storage = MemStorage.newEmptyStorage();
- clock = new FakeClock();
- }
-
- private static IScheduledTask makeTask(String id, int instanceId) {
-
- TaskConfig config = new TaskConfig()
- .setOwner(new Identity("user", "role"))
- .setEnvironment("test")
- .setJobName("jobName")
- .setProduction(false)
- .setConstraints(ImmutableSet.of(
- new Constraint("host", TaskConstraint.limit(new LimitConstraint(1)))))
- .setRequestedPorts(ImmutableSet.<String>of())
- .setMaxTaskFailures(1)
- .setTaskLinks(ImmutableMap.<String, String>of());
- ScheduledTask task = new ScheduledTask().setAssignedTask(
- new AssignedTask().setTask(config));
- task.getAssignedTask().setTaskId(id);
- task.getAssignedTask().setInstanceId(instanceId);
- return IScheduledTask.build(task);
- }
-
- @Test
- public void testRewriteThrottledState() {
- final IScheduledTask savedTask =
- IScheduledTask.build(makeTask("id", 0).newBuilder().setStatus(ScheduleStatus.THROTTLED));
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(savedTask));
- StorageBackfill.backfill(storeProvider, clock);
- }
- });
-
- assertEquals(
- IScheduledTask.build(savedTask.newBuilder().setStatus(ScheduleStatus.PENDING)),
- getTask("id"));
- }
-
- private IScheduledTask getTask(final String id) {
- return Iterables.getOnlyElement(
- Storage.Util.consistentFetchTasks(storage, Query.taskScoped(id)));
- }
-}