You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/04/08 01:25:29 UTC
git commit: Remove duplicate task throttling behavior from TaskGroups,
fix a few race conditions.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 589fa44c1 -> f73e29b31
Remove duplicate task throttling behavior from TaskGroups, fix a few race conditions.
Bugs closed: AURORA-302
Reviewed at https://reviews.apache.org/r/20066/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/f73e29b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/f73e29b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/f73e29b3
Branch: refs/heads/master
Commit: f73e29b3196f13e37d31c8e02dd17628edb548e0
Parents: 589fa44
Author: Bill Farner <wf...@apache.org>
Authored: Mon Apr 7 16:22:50 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Apr 7 16:22:50 2014 -0700
----------------------------------------------------------------------
.../aurora/scheduler/async/TaskGroup.java | 112 ++++-----------
.../aurora/scheduler/async/TaskGroups.java | 135 ++++++++-----------
.../aurora/scheduler/async/TaskScheduler.java | 23 ++--
.../aurora/scheduler/async/TaskGroupsTest.java | 130 ++++++++++++++++++
.../scheduler/async/TaskSchedulerImplTest.java | 30 ++---
.../scheduler/async/TaskSchedulerTest.java | 28 ++--
6 files changed, 241 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
index ece7e3a..0aa54cd 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
@@ -17,16 +17,10 @@ package org.apache.aurora.scheduler.async;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.twitter.common.base.Function;
-import com.twitter.common.util.BackoffStrategy;
+import com.google.common.collect.Lists;
import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
@@ -35,109 +29,51 @@ import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
*/
class TaskGroup {
private final GroupKey key;
- private final BackoffStrategy backoffStrategy;
+ private long penaltyMs;
+ private final Queue<String> tasks;
- private static final Function<Task, Long> TO_TIMESTAMP = new Function<Task, Long>() {
- @Override
- public Long apply(Task item) {
- return item.readyTimestampMs;
- }
- };
-
- // Order the tasks by the time they are ready to be scheduled
- private static final Ordering<Task> TASK_ORDERING = Ordering.natural().onResultOf(TO_TIMESTAMP);
- // 11 is the magic number used by PriorityBlockingQueue as the initial size.
- private final Queue<Task> tasks = new PriorityBlockingQueue<>(11, TASK_ORDERING);
- // Penalty for the task group for failing to schedule.
- private final AtomicLong penaltyMs;
-
- TaskGroup(GroupKey key, BackoffStrategy backoffStrategy) {
+ TaskGroup(GroupKey key, String initialTaskId) {
this.key = key;
- this.backoffStrategy = backoffStrategy;
- penaltyMs = new AtomicLong();
- resetPenaltyAndGet();
+ this.penaltyMs = 0;
+ this.tasks = Lists.newLinkedList();
+ this.tasks.add(initialTaskId);
}
- GroupKey getKey() {
+ synchronized GroupKey getKey() {
return key;
}
- private static final Function<Task, String> TO_TASK_ID =
- new Function<Task, String>() {
- @Override
- public String apply(Task item) {
- return item.taskId;
- }
- };
-
- /**
- * Removes the task at the head of the queue.
- *
- * @return String the id of the head task.
- * @throws IllegalStateException if the queue is empty.
- */
- String pop() throws IllegalStateException {
- Task head = tasks.poll();
- Preconditions.checkState(head != null);
- return head.taskId;
+ synchronized Optional<String> peek() {
+ return Optional.fromNullable(tasks.peek());
}
- void remove(String taskId) {
- Iterables.removeIf(tasks, Predicates.compose(Predicates.equalTo(taskId), TO_TASK_ID));
+ synchronized boolean hasMore() {
+ return !tasks.isEmpty();
}
- void push(final String taskId, long readyTimestamp) {
- tasks.offer(new Task(taskId, readyTimestamp));
+ synchronized void remove(String taskId) {
+ tasks.remove(taskId);
}
- synchronized long resetPenaltyAndGet() {
- penaltyMs.set(backoffStrategy.calculateBackoffMs(0));
- return getPenaltyMs();
+ synchronized void offer(String taskId) {
+ tasks.offer(taskId);
}
- synchronized long penalizeAndGet() {
- penaltyMs.set(backoffStrategy.calculateBackoffMs(getPenaltyMs()));
- return getPenaltyMs();
+ synchronized void setPenaltyMs(long penaltyMs) {
+ this.penaltyMs = penaltyMs;
}
- GroupState isReady(long nowMs) {
- Task task = tasks.peek();
- if (task == null) {
- return GroupState.EMPTY;
- }
-
- if (task.readyTimestampMs > nowMs) {
- return GroupState.NOT_READY;
- }
- return GroupState.READY;
- }
// Begin methods used for debug interfaces.
- public String getName() {
+ public synchronized String getName() {
return key.toString();
}
- public Set<String> getTaskIds() {
- return ImmutableSet.copyOf(Iterables.transform(tasks, TO_TASK_ID));
- }
-
- public long getPenaltyMs() {
- return penaltyMs.get();
- }
-
- private static class Task {
- private final String taskId;
- private final long readyTimestampMs;
-
- Task(String taskId, long readyTimestampMs) {
- this.taskId = Preconditions.checkNotNull(taskId);
- this.readyTimestampMs = readyTimestampMs;
- }
+ public synchronized Set<String> getTaskIds() {
+ return ImmutableSet.copyOf(tasks);
}
- enum GroupState {
- EMPTY, // The group is empty.
- NOT_READY, // Every task in the group is not ready yet.
- READY // The task at the head of the queue is ready.
+ public synchronized long getPenaltyMs() {
+ return penaltyMs;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index 6c95c6f..6aff091 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -15,20 +15,18 @@
*/
package org.apache.aurora.scheduler.async;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -50,10 +48,11 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.async.TaskGroup.GroupState;
/**
* A collection of task groups, where a task group is a collection of tasks that are known to be
@@ -66,10 +65,10 @@ import static org.apache.aurora.scheduler.async.TaskGroup.GroupState;
*/
public class TaskGroups implements EventSubscriber {
- private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
- private final LoadingCache<GroupKey, TaskGroup> groups;
- private final Clock clock;
+ private final ConcurrentMap<GroupKey, TaskGroup> groups = Maps.newConcurrentMap();
+ private final ScheduledExecutorService executor;
+ private final TaskScheduler taskScheduler;
+ private final BackoffStrategy backoff;
private final RescheduleCalculator rescheduleCalculator;
static class TaskGroupsSettings {
@@ -95,98 +94,64 @@ public class TaskGroups implements EventSubscriber {
settings.taskGroupBackoff,
settings.rateLimiter,
taskScheduler,
- clock,
rescheduleCalculator);
}
@VisibleForTesting
TaskGroups(
final ScheduledExecutorService executor,
- final BackoffStrategy taskGroupBackoffStrategy,
+ final BackoffStrategy backoff,
final RateLimiter rateLimiter,
final TaskScheduler taskScheduler,
- final Clock clock,
final RescheduleCalculator rescheduleCalculator) {
- checkNotNull(executor);
- checkNotNull(taskGroupBackoffStrategy);
+ this.executor = checkNotNull(executor);
checkNotNull(rateLimiter);
- checkNotNull(taskScheduler);
- this.clock = checkNotNull(clock);
+ this.taskScheduler = checkNotNull(taskScheduler);
+ this.backoff = checkNotNull(backoff);
this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
final TaskScheduler ratelLimitedScheduler = new TaskScheduler() {
@Override
- public TaskSchedulerResult schedule(String taskId) {
+ public boolean schedule(String taskId) {
rateLimiter.acquire();
return taskScheduler.schedule(taskId);
}
};
-
- groups = CacheBuilder.newBuilder().build(new CacheLoader<GroupKey, TaskGroup>() {
- @Override
- public TaskGroup load(GroupKey key) {
- TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
- LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
- startGroup(group, executor, ratelLimitedScheduler);
- return group;
- }
- });
}
- private synchronized boolean maybeInvalidate(TaskGroup group) {
- if (group.getTaskIds().isEmpty()) {
- groups.invalidate(group.getKey());
- return true;
+ private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) {
+ // Avoid check-then-act by holding the intrinsic lock. If not done atomically, we could
+ // remove a group while a task is being added to it.
+ if (group.hasMore()) {
+ executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS);
+ } else {
+ groups.remove(group.getKey());
}
- return false;
}
- private void startGroup(
- final TaskGroup group,
- final ScheduledExecutorService executor,
- final TaskScheduler taskScheduler) {
-
+ private void startGroup(final TaskGroup group) {
Runnable monitor = new Runnable() {
@Override
public void run() {
- GroupState state = group.isReady(clock.nowMillis());
-
- switch (state) {
- case EMPTY:
- maybeInvalidate(group);
- break;
-
- case READY:
- String id = group.pop();
- TaskScheduler.TaskSchedulerResult result = taskScheduler.schedule(id);
- switch (result) {
- case SUCCESS:
- if (!maybeInvalidate(group)) {
- executor.schedule(this, group.resetPenaltyAndGet(), TimeUnit.MILLISECONDS);
- }
- break;
-
- case TRY_AGAIN:
- group.push(id, clock.nowMillis());
- executor.schedule(this, group.penalizeAndGet(), TimeUnit.MILLISECONDS);
- break;
-
- default:
- throw new IllegalStateException("Unknown TaskSchedulerResult " + result);
+ Optional<String> taskId = group.peek();
+ long penaltyMs = 0;
+ if (taskId.isPresent()) {
+ if (taskScheduler.schedule(taskId.get())) {
+ group.remove(taskId.get());
+ if (group.hasMore()) {
+ penaltyMs = backoff.calculateBackoffMs(0);
}
- break;
-
- case NOT_READY:
- executor.schedule(this, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
- break;
-
- default:
- throw new IllegalStateException("Unknown GroupState " + state);
+ } else {
+ penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs());
+ }
}
+
+ group.setPenaltyMs(penaltyMs);
+ evaluateGroupLater(this, group);
}
};
- executor.schedule(monitor, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
+ evaluateGroupLater(monitor, group);
}
private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
@@ -205,11 +170,6 @@ public class TaskGroups implements EventSubscriber {
return executor;
}
- private synchronized void add(IAssignedTask task, long scheduleDelayMs) {
- groups.getUnchecked(new GroupKey(task.getTask()))
- .push(task.getTaskId(), clock.nowMillis() + scheduleDelayMs);
- }
-
/**
* Informs the task groups of a task state change.
* <p>
@@ -222,10 +182,21 @@ public class TaskGroups implements EventSubscriber {
public synchronized void taskChangedState(TaskStateChange stateChange) {
if (stateChange.getNewState() == PENDING) {
IScheduledTask task = stateChange.getTask();
- long readyAtMs = stateChange.isTransition()
- ? rescheduleCalculator.getFlappingPenaltyMs(task)
- : rescheduleCalculator.getStartupScheduleDelayMs(task);
- add(task.getAssignedTask(), readyAtMs);
+ GroupKey key = new GroupKey(task.getAssignedTask().getTask());
+ TaskGroup newGroup = new TaskGroup(key, Tasks.id(task));
+ TaskGroup existing = groups.putIfAbsent(key, newGroup);
+ if (existing == null) {
+ long penaltyMs;
+ if (stateChange.isTransition()) {
+ penaltyMs = backoff.calculateBackoffMs(0);
+ } else {
+ penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task);
+ }
+ newGroup.setPenaltyMs(penaltyMs);
+ startGroup(newGroup);
+ } else {
+ existing.offer(Tasks.id(task));
+ }
}
}
@@ -238,7 +209,7 @@ public class TaskGroups implements EventSubscriber {
public synchronized void tasksDeleted(TasksDeleted deleted) {
for (IAssignedTask task
: Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
- TaskGroup group = groups.getIfPresent(new GroupKey(task.getTask()));
+ TaskGroup group = groups.get(new GroupKey(task.getTask()));
if (group != null) {
group.remove(task.getTaskId());
}
@@ -246,7 +217,7 @@ public class TaskGroups implements EventSubscriber {
}
public Iterable<TaskGroup> getGroups() {
- return ImmutableSet.copyOf(groups.asMap().values());
+ return ImmutableSet.copyOf(groups.values());
}
static class GroupKey {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 263235c..f7f418a 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -79,15 +79,10 @@ interface TaskScheduler extends EventSubscriber {
* Attempts to schedule a task, possibly performing irreversible actions.
*
* @param taskId The task to attempt to schedule.
- * @return SUCCESS if the task was scheduled, TRY_AGAIN otherwise. The caller should call schedule
- * again if TRY_AGAIN is returned.
+ * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should
+ * call schedule again if {@code false} is returned.
*/
- TaskSchedulerResult schedule(String taskId);
-
- enum TaskSchedulerResult {
- SUCCESS,
- TRY_AGAIN
- }
+ boolean schedule(String taskId);
/**
* An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task
@@ -183,12 +178,12 @@ interface TaskScheduler extends EventSubscriber {
@Timed("task_schedule_attempt")
@Override
- public TaskSchedulerResult schedule(final String taskId) {
+ public boolean schedule(final String taskId) {
scheduleAttemptsFired.incrementAndGet();
try {
- return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
+ return storage.write(new MutateWork.Quiet<Boolean>() {
@Override
- public TaskSchedulerResult apply(MutableStoreProvider store) {
+ public Boolean apply(MutableStoreProvider store) {
LOG.fine("Attempting to schedule task " + taskId);
final IScheduledTask task = Iterables.getOnlyElement(
store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
@@ -202,7 +197,7 @@ interface TaskScheduler extends EventSubscriber {
if (!offerQueue.launchFirst(getAssignerFunction(aggregate, taskId, task))) {
// Task could not be scheduled.
maybePreemptFor(taskId, aggregate);
- return TaskSchedulerResult.TRY_AGAIN;
+ return false;
}
} catch (OfferQueue.LaunchException e) {
LOG.log(Level.WARNING, "Failed to launch task.", e);
@@ -217,7 +212,7 @@ interface TaskScheduler extends EventSubscriber {
}
}
- return TaskSchedulerResult.SUCCESS;
+ return true;
}
});
} catch (RuntimeException e) {
@@ -225,7 +220,7 @@ interface TaskScheduler extends EventSubscriber {
// if there is a transient issue resulting in an unchecked exception.
LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
scheduleAttemptsFailed.incrementAndGet();
- return TaskSchedulerResult.TRY_AGAIN;
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
new file mode 100644
index 0000000..e23ab5c
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.RateLimiter;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class TaskGroupsTest extends EasyMockTest {
+
+ private ScheduledExecutorService executor;
+ private BackoffStrategy backoffStrategy;
+ private TaskScheduler taskScheduler;
+ private RescheduleCalculator rescheduleCalculator;
+
+ private TaskGroups taskGroups;
+
+ @Before
+ public void setUp() throws Exception {
+ executor = createMock(ScheduledExecutorService.class);
+ backoffStrategy = createMock(BackoffStrategy.class);
+ taskScheduler = createMock(TaskScheduler.class);
+ rescheduleCalculator = createMock(RescheduleCalculator.class);
+ taskGroups = new TaskGroups(
+ executor,
+ backoffStrategy,
+ RateLimiter.create(10000),
+ taskScheduler,
+ rescheduleCalculator);
+ }
+
+ @Test
+ public void testEvaluatedImmediately() {
+ expect(backoffStrategy.calculateBackoffMs(0)).andReturn(0L);
+ executor.schedule(EasyMock.<Runnable>anyObject(), EasyMock.eq(0L), EasyMock.eq(MILLISECONDS));
+ expectLastCall().andAnswer(new IAnswer<ScheduledFuture<Void>>() {
+ @Override
+ public ScheduledFuture<Void> answer() {
+ ((Runnable) EasyMock.getCurrentArguments()[0]).run();
+ return null;
+ }
+ });
+ expect(taskScheduler.schedule("a")).andReturn(true);
+
+ control.replay();
+
+ taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a"), INIT));
+ }
+
+ private Capture<Runnable> expectEvaluate() {
+ Capture<Runnable> capture = createCapture();
+ executor.schedule(EasyMock.capture(capture), EasyMock.eq(0L), EasyMock.eq(MILLISECONDS));
+ expectLastCall().andReturn(null);
+ return capture;
+ }
+
+ @Test
+ public void testTaskDeletedBeforeEvaluating() {
+ final IScheduledTask task = makeTask("a");
+
+ expect(backoffStrategy.calculateBackoffMs(0)).andReturn(0L).atLeastOnce();
+ Capture<Runnable> evaluate = expectEvaluate();
+
+ expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(new IAnswer<Boolean>() {
+ @Override
+ public Boolean answer() {
+ // Test a corner case where a task is deleted while it is being evaluated by the task
+ // scheduler. If not handled carefully, this could result in the scheduler trying again
+ // later to satisfy the deleted task.
+ taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
+
+ return false;
+ }
+ });
+
+ control.replay();
+
+ taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)), INIT));
+ evaluate.getValue().run();
+ }
+
+ private static IScheduledTask makeTask(String id) {
+ return IScheduledTask.build(new ScheduledTask()
+ .setStatus(ScheduleStatus.PENDING)
+ .setAssignedTask(new AssignedTask()
+ .setTaskId(id)
+ .setTask(new TaskConfig()
+ .setOwner(new Identity("owner", "owner"))
+ .setEnvironment("test")
+ .setJobName("job"))));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 65e00f7..5bcd7a4 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -56,11 +56,11 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerResult.SUCCESS;
-import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerResult.TRY_AGAIN;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class TaskSchedulerImplTest extends EasyMockTest {
@@ -150,14 +150,14 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertEquals(TRY_AGAIN, scheduler.schedule("a"));
- assertEquals(TRY_AGAIN, scheduler.schedule("b"));
+ assertFalse(scheduler.schedule("a"));
+ assertFalse(scheduler.schedule("b"));
assertEquals(Optional.<TaskInfo>absent(), firstAssignment.getValue().apply(OFFER));
clock.advance(reservationDuration);
- assertEquals(SUCCESS, scheduler.schedule("b"));
+ assertTrue(scheduler.schedule("b"));
assertEquals(true, secondAssignment.getValue().apply(OFFER).isPresent());
}
@@ -187,12 +187,12 @@ public class TaskSchedulerImplTest extends EasyMockTest {
.andReturn(Optional.of(TaskInfo.getDefaultInstance()));
control.replay();
- assertEquals(TRY_AGAIN, scheduler.schedule("a"));
- assertEquals(SUCCESS, scheduler.schedule("a"));
+ assertFalse(scheduler.schedule("a"));
+ assertTrue(scheduler.schedule("a"));
firstAssignment.getValue().apply(OFFER);
eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
clock.advance(halfReservationDuration);
- assertEquals(SUCCESS, scheduler.schedule("b"));
+ assertTrue(scheduler.schedule("b"));
secondAssignment.getValue().apply(OFFER);
}
@@ -212,9 +212,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectAssigned(TASK_A);
control.replay();
- assertEquals(TRY_AGAIN, scheduler.schedule("a"));
+ assertFalse(scheduler.schedule("a"));
clock.advance(halfReservationDuration);
- assertEquals(SUCCESS, scheduler.schedule("a"));
+ assertTrue(scheduler.schedule("a"));
firstAssignment.getValue().apply(OFFER);
}
@@ -237,11 +237,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectAssigned(TASK_B);
control.replay();
- assertEquals(TRY_AGAIN, scheduler.schedule("a"));
+ assertFalse(scheduler.schedule("a"));
clock.advance(halfReservationDuration);
// Task is killed by user before it is scheduled
eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
- assertEquals(SUCCESS, scheduler.schedule("b"));
+ assertTrue(scheduler.schedule("b"));
assignment.getValue().apply(OFFER);
}
@@ -262,10 +262,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectAssigned(TASK_A);
control.replay();
- assertEquals(TRY_AGAIN, scheduler.schedule("b"));
+ assertFalse(scheduler.schedule("b"));
// We don't act on the reservation made by b because we want to see timeout behaviour.
clock.advance(reservationDuration);
- assertEquals(SUCCESS, scheduler.schedule("a"));
+ assertTrue(scheduler.schedule("a"));
firstAssignment.getValue().apply(OFFER);
}
@@ -298,7 +298,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertEquals(SUCCESS, scheduler.schedule(Tasks.id(taskA)));
+ assertTrue(scheduler.schedule(Tasks.id(taskA)));
assignment.getValue().apply(OFFER);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f73e29b3/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index ce03abc..bf1391e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -41,7 +41,6 @@ import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
-import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
@@ -106,7 +105,7 @@ public class TaskSchedulerTest extends EasyMockTest {
private OfferQueue offerQueue;
private TaskGroups taskGroups;
private FakeClock clock;
- private BackoffStrategy flappingStrategy;
+ private RescheduleCalculator rescheduleCalculator;
private Preemptor preemptor;
private AttributeAggregate emptyJob;
private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES);
@@ -124,7 +123,7 @@ public class TaskSchedulerTest extends EasyMockTest {
returnDelay = createMock(OfferReturnDelay.class);
clock = new FakeClock();
clock.setNowMillis(0);
- flappingStrategy = createMock(BackoffStrategy.class);
+ rescheduleCalculator = createMock(RescheduleCalculator.class);
preemptor = createMock(Preemptor.class);
emptyJob = new AttributeAggregate(
Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
@@ -134,8 +133,6 @@ public class TaskSchedulerTest extends EasyMockTest {
private void replayAndCreateScheduler() {
control.replay();
offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenance);
- RateLimiter rateLimiter = RateLimiter.create(1);
- Amount<Long, Time> flappingThreshold = Amount.of(5L, Time.MINUTES);
TaskScheduler scheduler = new TaskSchedulerImpl(storage,
stateManager,
assigner,
@@ -146,16 +143,9 @@ public class TaskSchedulerTest extends EasyMockTest {
taskGroups = new TaskGroups(
executor,
retryStrategy,
- rateLimiter,
+ RateLimiter.create(100),
scheduler,
- clock,
- // TODO(wfarner): Use a mock rather than impl here.
- new RescheduleCalculatorImpl(
- storage,
- new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
- flappingStrategy,
- flappingThreshold,
- Amount.of(5, Time.SECONDS))));
+ rescheduleCalculator);
}
private Capture<Runnable> expectOffer() {
@@ -250,13 +240,15 @@ public class TaskSchedulerTest extends EasyMockTest {
@Test
public void testLoadFromStorage() {
- expectTaskGroupBackoff(10);
-
- replayAndCreateScheduler();
-
final IScheduledTask a = makeTask("a", KILLED);
final IScheduledTask b = makeTask("b", PENDING);
final IScheduledTask c = makeTask("c", RUNNING);
+
+ expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L);
+ expectTaskRetryIn(10);
+
+ replayAndCreateScheduler();
+
storage.write(new MutateWork.NoResult.Quiet() {
@Override
protected void execute(MutableStoreProvider store) {