You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/08/03 20:47:14 UTC
[1/2] aurora git commit: Centralizing offer/task matching in
TaskAssigner.
Repository: aurora
Updated Branches:
refs/heads/master 1c0086ffc -> fb0325065
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
deleted file mode 100644
index 9a91e63..0000000
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
+++ /dev/null
@@ -1,671 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.scheduling;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.BackoffStrategy;
-
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
-import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.offers.Offers;
-import org.apache.aurora.scheduler.preemptor.BiCache;
-import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.apache.mesos.Protos.Offer;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.isA;
-import static org.junit.Assert.assertEquals;
-
-/**
- * TODO(wfarner): Break this test up to independently test TaskSchedulerImpl and OfferQueueImpl.
- */
-public class TaskSchedulerTest extends EasyMockTest {
-
- private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
-
- private static final HostOffer OFFER_A = makeOffer("OFFER_A", "HOST_A", NONE);
- private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED);
- private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING);
- private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED);
- private static final String SLAVE_A = OFFER_A.getOffer().getSlaveId().getValue();
- private static final String SLAVE_B = OFFER_B.getOffer().getSlaveId().getValue();
- private static final String SLAVE_C = OFFER_C.getOffer().getSlaveId().getValue();
-
- private Storage storage;
-
- private MaintenanceController maintenance;
- private StateManager stateManager;
- private TaskAssigner assigner;
- private BackoffStrategy retryStrategy;
- private Driver driver;
- private ScheduledExecutorService executor;
- private ScheduledFuture<?> future;
- private OfferReturnDelay returnDelay;
- private OfferManager offerManager;
- private TaskGroups taskGroups;
- private RescheduleCalculator rescheduleCalculator;
- private Preemptor preemptor;
- private BiCache<String, TaskGroupKey> reservations;
-
- @Before
- public void setUp() {
- storage = DbUtil.createStorage();
- maintenance = createMock(MaintenanceController.class);
- stateManager = createMock(StateManager.class);
- assigner = createMock(TaskAssigner.class);
- retryStrategy = createMock(BackoffStrategy.class);
- driver = createMock(Driver.class);
- executor = createMock(ScheduledExecutorService.class);
- future = createMock(ScheduledFuture.class);
- returnDelay = createMock(OfferReturnDelay.class);
- rescheduleCalculator = createMock(RescheduleCalculator.class);
- preemptor = createMock(Preemptor.class);
- reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
- }
-
- private void replayAndCreateScheduler() {
- control.replay();
- offerManager = new OfferManagerImpl(driver, returnDelay, executor);
- TaskScheduler scheduler = new TaskSchedulerImpl(storage,
- stateManager,
- assigner,
- offerManager,
- preemptor,
- reservations);
- taskGroups = new TaskGroups(
- executor,
- Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS),
- retryStrategy,
- RateLimiter.create(100),
- scheduler,
- rescheduleCalculator);
- }
-
- private Capture<Runnable> expectOffer() {
- return expectOfferDeclineIn(10);
- }
-
- private Capture<Runnable> expectOfferDeclineIn(long delayMillis) {
- expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS));
- Capture<Runnable> runnable = createCapture();
- executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS));
- expectLastCall().andReturn(createMock(ScheduledFuture.class));
- return runnable;
- }
-
- private void changeState(
- IScheduledTask task,
- ScheduleStatus oldState,
- ScheduleStatus newState) {
-
- final IScheduledTask copy = IScheduledTask.build(task.newBuilder().setStatus(newState));
- // Insert the task if it doesn't already exist.
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- public void execute(MutableStoreProvider storeProvider) {
- TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
- if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) {
- taskStore.saveTasks(ImmutableSet.of(copy));
- }
- }
- });
- taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState));
- }
-
- private Capture<Runnable> expectTaskRetryIn(long penaltyMs) {
- Capture<Runnable> capture = createCapture();
- executor.schedule(
- capture(capture),
- eq(penaltyMs),
- eq(TimeUnit.MILLISECONDS));
- expectLastCall().andReturn(future);
- return capture;
- }
-
- private Capture<Runnable> expectTaskGroupBackoff(long previousPenaltyMs, long nextPenaltyMs) {
- expect(retryStrategy.calculateBackoffMs(previousPenaltyMs)).andReturn(nextPenaltyMs);
- return expectTaskRetryIn(nextPenaltyMs);
- }
-
- @Test
- public void testNoTasks() {
- expectAnyMaintenanceCalls();
- expectOfferDeclineIn(10);
- expectOfferDeclineIn(10);
-
- replayAndCreateScheduler();
-
- offerManager.addOffer(OFFER_A);
- offerManager.addOffer(OFFER_B);
- }
-
- @Test
- public void testNoOffers() {
- Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- IScheduledTask task = createTask("a");
- expectPreemptorCall(task.getAssignedTask());
- expectReservationCheck(task);
-
- replayAndCreateScheduler();
-
- changeState(task, INIT, PENDING);
- timeoutCapture.getValue().run();
- }
-
- private IScheduledTask createTask(String taskId) {
- return createTask(taskId, null);
- }
-
- private IScheduledTask createTask(String taskId, @Nullable ScheduleStatus status) {
- return setStatus(makeTask(taskId, TaskTestUtil.JOB), status);
- }
-
- private IScheduledTask setStatus(IScheduledTask task, @Nullable ScheduleStatus status) {
- return IScheduledTask.build(task.newBuilder().setStatus(status));
- }
-
- @Test
- public void testLoadFromStorage() {
- final IScheduledTask a = createTask("a", KILLED);
- final IScheduledTask b = createTask("b", PENDING);
- final IScheduledTask c = createTask("c", RUNNING);
-
- expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L);
- expectTaskRetryIn(10);
-
- replayAndCreateScheduler();
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- public void execute(MutableStoreProvider store) {
- store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c));
- }
- });
- for (IScheduledTask task : ImmutableList.of(a, b, c)) {
- taskGroups.taskChangedState(TaskStateChange.initialized(task));
- }
- changeState(c, RUNNING, FINISHED);
- }
-
- @Test
- public void testTaskMissing() {
- Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
- replayAndCreateScheduler();
-
- taskGroups.taskChangedState(TaskStateChange.transition(createTask("a", PENDING), INIT));
- timeoutCapture.getValue().run();
- }
-
- private IExpectationSetters<Assignment> expectMaybeAssign(
- HostOffer offer,
- IScheduledTask task,
- AttributeAggregate jobAggregate) {
-
- return expect(assigner.maybeAssign(
- EasyMock.anyObject(),
- eq(offer),
- eq(new ResourceRequest(task.getAssignedTask().getTask(), jobAggregate)),
- eq(Tasks.id(task))));
- }
-
- private IExpectationSetters<?> expectNoReservation(String slaveId) {
- return expect(reservations.get(slaveId)).andReturn(Optional.absent());
- }
-
- private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
- return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
- .andReturn(ImmutableSet.of());
- }
-
- @Test
- public void testTaskAssigned() {
- expectAnyMaintenanceCalls();
- expectOfferDeclineIn(10);
-
- IScheduledTask taskA = createTask("a", PENDING);
- TaskInfo mesosTask = makeTaskInfo(taskA);
-
- Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expectNoReservation(SLAVE_A).times(2);
- expectReservationCheck(taskA);
- expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.failure());
- expectPreemptorCall(taskA.getAssignedTask());
-
- Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTask));
- driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
-
- Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- IScheduledTask taskB = createTask("b");
- expectReservationCheck(taskB);
- expectPreemptorCall(taskB.getAssignedTask());
-
- replayAndCreateScheduler();
-
- offerManager.addOffer(OFFER_A);
- changeState(taskA, INIT, PENDING);
- timeoutCapture.getValue().run();
- timeoutCapture2.getValue().run();
-
- // Ensure the offer was consumed.
- changeState(taskB, INIT, PENDING);
- timeoutCapture3.getValue().run();
- }
-
- @Test
- public void testDriverNotReady() {
- IScheduledTask task = createTask("a", PENDING);
- TaskInfo mesosTask = TaskInfo.newBuilder()
- .setName(Tasks.id(task))
- .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
- .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
- .build();
-
- Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expectAnyMaintenanceCalls();
- expectOfferDeclineIn(10);
- expectNoReservation(SLAVE_A);
- expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
- driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
- expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
- expect(stateManager.changeState(
- EasyMock.anyObject(),
- eq("a"),
- eq(Optional.of(PENDING)),
- eq(LOST),
- eq(TaskSchedulerImpl.LAUNCH_FAILED_MSG)))
- .andReturn(StateChangeResult.SUCCESS);
-
- replayAndCreateScheduler();
-
- changeState(task, INIT, PENDING);
- offerManager.addOffer(OFFER_A);
- timeoutCapture.getValue().run();
- }
-
- @Test
- public void testStorageException() {
- IScheduledTask task = createTask("a", PENDING);
- TaskInfo mesosTask = TaskInfo.newBuilder()
- .setName(Tasks.id(task))
- .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
- .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
- .build();
-
- Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expectAnyMaintenanceCalls();
- expectOfferDeclineIn(10);
- expectNoReservation(SLAVE_A).times(2);
- expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure."));
-
- Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
- driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
- expectLastCall();
-
- replayAndCreateScheduler();
-
- changeState(task, INIT, PENDING);
- offerManager.addOffer(OFFER_A);
- timeoutCapture.getValue().run();
- timeoutCapture2.getValue().run();
- }
-
- @Test
- public void testExpiration() {
- IScheduledTask task = createTask("a", PENDING);
-
- Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
- expectAnyMaintenanceCalls();
- expectNoReservation(SLAVE_A);
- expectReservationCheck(task).times(2);
- expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
- Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- expectPreemptorCall(task.getAssignedTask());
- driver.declineOffer(OFFER_A.getOffer().getId());
- expectTaskGroupBackoff(10, 20);
- expectPreemptorCall(task.getAssignedTask());
-
- replayAndCreateScheduler();
-
- changeState(task, INIT, PENDING);
- offerManager.addOffer(OFFER_A);
- timeoutCapture.getValue().run();
- offerExpirationCapture.getValue().run();
- timeoutCapture2.getValue().run();
- }
-
- @Test
- public void testOneOfferPerSlave() {
- expectAnyMaintenanceCalls();
- Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
-
- HostOffer offerAB = new HostOffer(
- Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(),
- IHostAttributes.build(new HostAttributes()));
-
- driver.declineOffer(OFFER_A.getOffer().getId());
- driver.declineOffer(offerAB.getOffer().getId());
-
- replayAndCreateScheduler();
-
- offerManager.addOffer(OFFER_A);
- offerManager.addOffer(offerAB);
- offerExpirationCapture.getValue().run();
- }
-
- @Test
- public void testDontDeclineAcceptedOffer() throws OfferManager.LaunchException {
- expectAnyMaintenanceCalls();
- Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
-
- Function<HostOffer, Assignment> offerAcceptor =
- createMock(new Clazz<Function<HostOffer, Assignment>>() { });
- final TaskInfo taskInfo = TaskInfo.getDefaultInstance();
- expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(taskInfo));
- driver.launchTask(OFFER_A.getOffer().getId(), taskInfo);
-
- replayAndCreateScheduler();
-
- offerManager.addOffer(OFFER_A);
- offerManager.launchFirst(offerAcceptor, TaskGroupKey.from(ITaskConfig.build(new TaskConfig())));
- offerExpirationCapture.getValue().run();
- }
-
- @Test
- public void testBasicMaintenancePreferences() {
- expectOffer();
- expectOffer();
- expectOffer();
- expectOffer();
-
- IScheduledTask taskA = createTask("A", PENDING);
- TaskInfo mesosTaskA = makeTaskInfo(taskA);
- expectNoReservation(SLAVE_A);
- expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
- driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
- Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
- IScheduledTask taskB = createTask("B", PENDING);
- TaskInfo mesosTaskB = makeTaskInfo(taskB);
- expectNoReservation(SLAVE_B);
- expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
- driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
- Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
- replayAndCreateScheduler();
-
- offerManager.addOffer(OFFER_D);
- offerManager.addOffer(OFFER_C);
- offerManager.addOffer(OFFER_B);
- offerManager.addOffer(OFFER_A);
-
- changeState(taskA, INIT, PENDING);
- captureA.getValue().run();
-
- changeState(taskB, INIT, PENDING);
- captureB.getValue().run();
- }
-
- @Test
- public void testChangingMaintenancePreferences() {
- expectOffer();
- expectOffer();
- expectOffer();
-
- IScheduledTask taskA = createTask("A", PENDING);
- TaskInfo mesosTaskA = makeTaskInfo(taskA);
- expectNoReservation(SLAVE_B);
- expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
- driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
- Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
- IScheduledTask taskB = createTask("B", PENDING);
- TaskInfo mesosTaskB = makeTaskInfo(taskB);
- HostOffer updatedOfferC = new HostOffer(
- OFFER_C.getOffer(),
- IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
- expectNoReservation(SLAVE_C);
- expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
- driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
- Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
- replayAndCreateScheduler();
-
- offerManager.addOffer(OFFER_A);
- offerManager.addOffer(OFFER_B);
- offerManager.addOffer(OFFER_C);
-
- // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable
-
- // Expected order now (B), with (C, A) unschedulable
- changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING);
- changeState(taskA, INIT, PENDING);
- captureA.getValue().run();
-
- // Expected order now (C), with (A) unschedulable and (B) already consumed
- changeHostMaintenanceState(OFFER_C.getAttributes(), NONE);
- changeState(taskB, INIT, PENDING);
- captureB.getValue().run();
- }
-
- private Capture<String> expectTaskScheduled(IScheduledTask task) {
- TaskInfo mesosTask = makeTaskInfo(task);
- Capture<String> taskId = createCapture();
- expect(assigner.maybeAssign(
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- capture(taskId))).andReturn(Assignment.success(mesosTask));
- driver.launchTask(EasyMock.anyObject(), eq(mesosTask));
- return taskId;
- }
-
- @Test
- public void testResistsStarvation() {
- // TODO(wfarner): This test requires intimate knowledge of the way futures are used inside
- // TaskScheduler. It's time to test using a real ScheduledExecutorService.
-
- expectAnyMaintenanceCalls();
-
- IScheduledTask jobA0 = setStatus(makeTask("a0", JobKeys.from("a", "b", "c")), PENDING);
-
- ScheduledTask jobA1Builder = jobA0.newBuilder();
- jobA1Builder.getAssignedTask().setTaskId("a1");
- jobA1Builder.getAssignedTask().setInstanceId(1);
- IScheduledTask jobA1 = IScheduledTask.build(jobA1Builder);
-
- ScheduledTask jobA2Builder = jobA0.newBuilder();
- jobA2Builder.getAssignedTask().setTaskId("a2");
- jobA2Builder.getAssignedTask().setInstanceId(2);
- IScheduledTask jobA2 = IScheduledTask.build(jobA2Builder);
-
- IScheduledTask jobB0 = setStatus(makeTask("b0", JobKeys.from("d", "e", "f")), PENDING);
-
- expectNoReservation(SLAVE_A);
- expectNoReservation(SLAVE_B);
-
- expectOfferDeclineIn(10);
- expectOfferDeclineIn(10);
- expectOfferDeclineIn(10);
- expectOfferDeclineIn(10);
-
- Capture<Runnable> timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- Capture<Runnable> timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
- Capture<String> firstScheduled = expectTaskScheduled(jobA0);
- Capture<String> secondScheduled = expectTaskScheduled(jobB0);
-
- // Expect another watch of the task group for job A.
- expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
- replayAndCreateScheduler();
-
- offerManager.addOffer(OFFER_A);
- offerManager.addOffer(OFFER_B);
- offerManager.addOffer(OFFER_C);
- offerManager.addOffer(OFFER_D);
- changeState(jobA0, INIT, PENDING);
- changeState(jobA1, INIT, PENDING);
- changeState(jobA2, INIT, PENDING);
- changeState(jobB0, INIT, PENDING);
- timeoutA.getValue().run();
- timeoutB.getValue().run();
- assertEquals(
- ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)),
- ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue()));
- }
-
- @Test
- public void testTaskDeleted() {
- expectAnyMaintenanceCalls();
- expectOfferDeclineIn(10);
-
- final IScheduledTask task = createTask("a", PENDING);
-
- Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expectNoReservation(SLAVE_A);
- expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
- expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
- expectReservationCheck(task);
- expectPreemptorCall(task.getAssignedTask());
-
- replayAndCreateScheduler();
-
- offerManager.addOffer(OFFER_A);
- changeState(task, INIT, PENDING);
- timeoutCapture.getValue().run();
-
- // Ensure the offer was consumed.
- changeState(task, INIT, PENDING);
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- public void execute(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task));
- }
- });
- taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
- timeoutCapture.getValue().run();
- }
-
- private TaskInfo makeTaskInfo(IScheduledTask task) {
- return TaskInfo.newBuilder()
- .setName(Tasks.id(task))
- .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
- .setSlaveId(SlaveID.newBuilder().setValue("slave-id" + task.toString()))
- .build();
- }
-
- private void expectAnyMaintenanceCalls() {
- expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes();
- }
-
- private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) {
- offerManager.hostAttributesChanged(new PubsubEvent.HostAttributesChanged(
- IHostAttributes.build(attributes.newBuilder().setMode(mode))));
- }
-
- private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) {
- Offer offer = Offers.makeOffer(offerId, hostName);
- return new HostOffer(
- offer,
- IHostAttributes.build(new HostAttributes()
- .setHost(hostName)
- .setSlaveId(offer.getSlaveId().getValue())
- .setAttributes(ImmutableSet.of())
- .setMode(mode)));
- }
-
- private void expectPreemptorCall(IAssignedTask task) {
- expect(preemptor.attemptPreemptionFor(
- eq(task),
- eq(EMPTY),
- EasyMock.anyObject())).andReturn(Optional.absent());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index f98818f..c9c6f5d 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -13,6 +13,7 @@
*/
package org.apache.aurora.scheduler.state;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -20,17 +21,19 @@ import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ExecutorConfig;
import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
+import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -46,11 +49,16 @@ import org.apache.mesos.Protos.Value.Type;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LAUNCH_FAILED_MSG;
import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class TaskAssignerImplTest extends EasyMockTest {
@@ -74,8 +82,10 @@ public class TaskAssignerImplTest extends EasyMockTest {
.setAssignedTask(new AssignedTask()
.setTaskId("taskId")
.setTask(new TaskConfig()
+ .setJob(new JobKey("r", "e", "n"))
.setExecutorConfig(new ExecutorConfig().setData("opaque data"))
.setRequestedPorts(ImmutableSet.of(PORT_NAME)))));
+ private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
.setName("taskName")
.setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
@@ -86,19 +96,23 @@ public class TaskAssignerImplTest extends EasyMockTest {
private StateManager stateManager;
private SchedulingFilter filter;
private MesosTaskFactory taskFactory;
+ private OfferManager offerManager;
private TaskAssigner assigner;
@Before
public void setUp() throws Exception {
storeProvider = createMock(MutableStoreProvider.class);
- stateManager = createMock(StateManager.class);
filter = createMock(SchedulingFilter.class);
taskFactory = createMock(MesosTaskFactory.class);
- assigner = new TaskAssignerImpl(stateManager, filter, taskFactory);
+ stateManager = createMock(StateManager.class);
+ offerManager = createMock(OfferManager.class);
+ assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager);
}
@Test
- public void testAssignNoVetoes() {
+ public void testAssignNoVetoes() throws Exception {
+ expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+ offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
expect(filter.filter(
new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
@@ -115,17 +129,18 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertEquals(
- Assignment.success(TASK_INFO),
- assigner.maybeAssign(
- storeProvider,
- OFFER,
- new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
- Tasks.id(TASK)));
+ assertTrue(assigner.maybeAssign(
+ storeProvider,
+ new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ Tasks.id(TASK),
+ Optional.of(MESOS_OFFER.getSlaveId().getValue())));
}
@Test
- public void testAssignVetoes() {
+ public void testAssignVetoesWithStaticBan() throws Exception {
+ expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+ offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
expect(filter.filter(
new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
@@ -133,12 +148,79 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertEquals(
- Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))),
- assigner.maybeAssign(
- storeProvider,
- OFFER,
- new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
- Tasks.id(TASK)));
+ assertFalse(assigner.maybeAssign(
+ storeProvider,
+ new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ Tasks.id(TASK),
+ Optional.<String>absent()));
+ }
+
+ @Test
+ public void testAssignVetoesWithNoStaticBan() throws Exception {
+ expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+ expect(filter.filter(
+ new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
+ new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
+ .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
+
+ control.replay();
+
+ assertFalse(assigner.maybeAssign(
+ storeProvider,
+ new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ Tasks.id(TASK),
+ Optional.<String>absent()));
+ }
+
+ @Test
+ public void testAssignmentClearedOnError() throws Exception {
+ expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+ offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+ expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
+ expect(filter.filter(
+ new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
+ new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
+ .andReturn(ImmutableSet.of());
+ expect(stateManager.assignTask(
+ storeProvider,
+ Tasks.id(TASK),
+ MESOS_OFFER.getHostname(),
+ MESOS_OFFER.getSlaveId(),
+ ImmutableMap.of(PORT_NAME, PORT)))
+ .andReturn(TASK.getAssignedTask());
+ expect(stateManager.changeState(
+ storeProvider,
+ Tasks.id(TASK),
+ Optional.of(PENDING),
+ LOST,
+ LAUNCH_FAILED_MSG))
+ .andReturn(StateChangeResult.SUCCESS);
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId()))
+ .andReturn(TASK_INFO);
+
+ control.replay();
+
+ assertFalse(assigner.maybeAssign(
+ storeProvider,
+ new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ Tasks.id(TASK),
+ Optional.<String>absent()));
+ }
+
+ @Test
+ public void testAssignmentSkippedForReservedSlave() throws Exception {
+ expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+
+ control.replay();
+
+ assertFalse(assigner.maybeAssign(
+ storeProvider,
+ new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ Tasks.id(TASK),
+ Optional.of("invalid")));
}
}
[2/2] aurora git commit: Centralizing offer/task matching in
TaskAssigner.
Posted by ma...@apache.org.
Centralizing offer/task matching in TaskAssigner.
Bugs closed: AURORA-1416
Reviewed at https://reviews.apache.org/r/37001/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/fb032506
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/fb032506
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/fb032506
Branch: refs/heads/master
Commit: fb032506529894628d9e0f85a0ded095c938bf49
Parents: 1c0086f
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Mon Aug 3 11:46:58 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Mon Aug 3 11:46:58 2015 -0700
----------------------------------------------------------------------
.../aurora/benchmark/SchedulingBenchmarks.java | 5 +-
.../benchmark/fakes/FakeOfferManager.java | 20 +-
.../aurora/scheduler/offers/OfferManager.java | 148 ++--
.../scheduler/scheduling/SchedulingModule.java | 2 +-
.../scheduler/scheduling/TaskScheduler.java | 101 +--
.../aurora/scheduler/state/TaskAssigner.java | 220 +++---
.../scheduler/offers/OfferManagerImplTest.java | 164 +++--
.../scheduling/TaskSchedulerImplTest.java | 216 +++---
.../scheduler/scheduling/TaskSchedulerTest.java | 671 -------------------
.../scheduler/state/TaskAssignerImplTest.java | 122 +++-
10 files changed, 487 insertions(+), 1182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 5bc73d5..d75f090 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -38,6 +38,7 @@ import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator;
import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.AsyncModule;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -117,7 +118,9 @@ public class SchedulingBenchmarks {
new PrivateModule() {
@Override
protected void configure() {
- bind(ScheduledExecutorService.class).toInstance(executor);
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(AsyncModule.AsyncExecutor.class)
+ .toInstance(executor);
bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
bind(OfferManager.OfferReturnDelay.class).toInstance(
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index f413301..fbd24ea 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -13,14 +13,12 @@
*/
package org.apache.aurora.benchmark.fakes;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.mesos.Protos;
public class FakeOfferManager implements OfferManager {
@@ -30,15 +28,23 @@ public class FakeOfferManager implements OfferManager {
}
@Override
- public void cancelOffer(Protos.OfferID offer) {
+ public void cancelOffer(Protos.OfferID offerId) {
// no-op
}
@Override
- public boolean launchFirst(
- Function<HostOffer, TaskAssigner.Assignment> acceptor,
- TaskGroupKey groupKey) throws LaunchException {
- return false;
+ public void launchTask(Protos.OfferID offerId, Protos.TaskInfo taskInfo) throws LaunchException {
+ // no-op
+ }
+
+ @Override
+ public void banOffer(Protos.OfferID offerId, TaskGroupKey groupKey) {
+ // no-op
+ }
+
+ @Override
+ public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 14bf265..4b8a55f 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -29,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -47,8 +47,8 @@ import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.Protos;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
@@ -76,22 +76,27 @@ public interface OfferManager extends EventSubscriber {
* Invalidates an offer. This indicates that the scheduler should not attempt to match any
* tasks against the offer.
*
- * @param offer Canceled offer.
+ * @param offerId Canceled offer.
*/
- void cancelOffer(OfferID offer);
+ void cancelOffer(OfferID offerId);
/**
- * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}.
+ * Exclude an offer that results in a static mismatch from further attempts to match against all
+ * tasks from the same group.
*
- * @param acceptor Function that determines if an offer is accepted.
- * @param groupKey Task group key.
- * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
- * {@code acceptor}.
- * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
- * task.
+ * @param offerId Offer ID to exclude for the given {@code groupKey}.
+ * @param groupKey Task group key to exclude.
*/
- boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
- throws LaunchException;
+ void banOffer(OfferID offerId, TaskGroupKey groupKey);
+
+ /**
+ * Launches the task matched against the offer.
+ *
+ * @param offerId Matched offer ID.
+ * @param task Matched task info.
+ * @throws LaunchException If there was an error launching the task.
+ */
+ void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
/**
* Notifies the offer queue that a host's attributes have changed.
@@ -108,6 +113,14 @@ public interface OfferManager extends EventSubscriber {
Iterable<HostOffer> getOffers();
/**
+ * Gets all offers that are not statically banned for the given {@code groupKey}.
+ *
+ * @param groupKey Task group key to check offers for.
+ * @return A snapshot of all offers eligible for the given {@code groupKey}.
+ */
+ Iterable<HostOffer> getOffers(TaskGroupKey groupKey);
+
+ /**
* Gets an offer for the given slave ID.
*
* @param slaveId Slave ID to get offer for.
@@ -127,7 +140,8 @@ public interface OfferManager extends EventSubscriber {
* Thrown when there was an unexpected failure trying to launch a task.
*/
class LaunchException extends Exception {
- LaunchException(String msg) {
+ @VisibleForTesting
+ public LaunchException(String msg) {
super(msg);
}
@@ -218,6 +232,11 @@ public interface OfferManager extends EventSubscriber {
}
@Override
+ public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
+ return hostOffers.getWeaklyConsistentOffers(groupKey);
+ }
+
+ @Override
public Optional<HostOffer> getOffer(SlaveID slaveId) {
return hostOffers.get(slaveId);
}
@@ -268,7 +287,7 @@ public interface OfferManager extends EventSubscriber {
private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
// TODO(maxim): Expose via a debug endpoint. AURORA-1136.
// Keep track of offer->groupKey mappings that will never be matched to avoid redundant
- // scheduling attempts. See Assignment.Result for more details on static ban.
+ // scheduling attempts. See VetoGroup for more details on static ban.
private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create();
HostOffers() {
@@ -304,7 +323,7 @@ public interface OfferManager extends EventSubscriber {
if (offer != null) {
// Remove and re-add a host's offer to re-sort based on its new hostStatus
remove(offer.getOffer().getId());
- add(new HostOffer(offer.getOffer(), attributes));
+ add(new HostOffer(offer.getOffer(), attributes));
}
}
@@ -312,27 +331,14 @@ public interface OfferManager extends EventSubscriber {
return Iterables.unmodifiableIterable(offers);
}
- synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
- boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey);
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine(String.format(
- "Host offer %s is statically banned for %s: %s",
- offer,
- groupKey,
- result));
- }
- return result;
+ synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) {
+ return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(
+ e -> !staticallyBannedOffers.containsEntry(e.getOffer().getId(), groupKey)));
}
- synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) {
- OfferID offerId = offer.getOffer().getId();
+ synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) {
if (offersById.containsKey(offerId)) {
staticallyBannedOffers.put(offerId, groupKey);
-
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine(
- String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey));
- }
}
}
@@ -345,63 +351,31 @@ public interface OfferManager extends EventSubscriber {
}
}
- @Timed("offer_queue_launch_first")
@Override
- public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
- throws LaunchException {
-
- // It's important that this method is not called concurrently - doing so would open up the
- // possibility of a race between the same offers being accepted by different threads.
-
- for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
- if (!hostOffers.isStaticallyBanned(offer, groupKey)
- && acceptOffer(offer, acceptor, groupKey)) {
- return true;
- }
- }
-
- return false;
+ public void banOffer(OfferID offerId, TaskGroupKey groupKey) {
+ hostOffers.addStaticGroupBan(offerId, groupKey);
}
- @Timed("offer_queue_accept_offer")
- protected boolean acceptOffer(
- HostOffer offer,
- Function<HostOffer, Assignment> acceptor,
- TaskGroupKey groupKey) throws LaunchException {
-
- Assignment assignment = acceptor.apply(offer);
- switch (assignment.getResult()) {
-
- case SUCCESS:
- // Guard against an offer being removed after we grabbed it from the iterator.
- // If that happens, the offer will not exist in hostOffers, and we can immediately
- // send it back to LOST for quick reschedule.
- // Removing while iterating counts on the use of a weakly-consistent iterator being used,
- // which is a feature of ConcurrentSkipListSet.
- if (hostOffers.remove(offer.getOffer().getId())) {
- try {
- driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get());
- return true;
- } catch (IllegalStateException e) {
- // TODO(William Farner): Catch only the checked exception produced by Driver
- // once it changes from throwing IllegalStateException when the driver is not yet
- // registered.
- throw new LaunchException("Failed to launch task.", e);
- }
- } else {
- offerRaces.incrementAndGet();
- throw new LaunchException(
- "Accepted offer no longer exists in offer queue, likely data race.");
- }
-
- case FAILURE_STATIC_MISMATCH:
- // Exclude an offer that results in a static mismatch from further attempts to match
- // against all tasks from the same group.
- hostOffers.addStaticGroupBan(offer, groupKey);
- return false;
-
- default:
- return false;
+ @Timed("offer_manager_launch_task")
+ @Override
+ public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException {
+ // Guard against an offer being removed after we grabbed it from the iterator.
+ // If that happens, the offer will not exist in hostOffers, and we can immediately
+ // send it back to LOST for quick reschedule.
+ // Removing while iterating counts on the use of a weakly-consistent iterator being used,
+ // which is a feature of ConcurrentSkipListSet.
+ if (hostOffers.remove(offerId)) {
+ try {
+ driver.launchTask(offerId, task);
+ } catch (IllegalStateException e) {
+ // TODO(William Farner): Catch only the checked exception produced by Driver
+ // once it changes from throwing IllegalStateException when the driver is not yet
+ // registered.
+ throw new LaunchException("Failed to launch task.", e);
+ }
+ } else {
+ offerRaces.incrementAndGet();
+ throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index c7a1a46..b9dccc6 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -112,7 +112,7 @@ public class SchedulingModule extends AbstractModule {
install(new PrivateModule() {
@Override
protected void configure() {
- bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class);
+ bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).in(Singleton.class);
bind(BiCache.BiCacheSettings.class).toInstance(
new BiCache.BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size"));
bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index d4bd529..0f0bfca 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -23,26 +23,21 @@ import javax.inject.Inject;
import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.Subscribe;
import com.twitter.common.inject.TimedInterceptor.Timed;
import com.twitter.common.stats.Stats;
-import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -56,7 +51,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
/**
@@ -91,11 +85,9 @@ public interface TaskScheduler extends EventSubscriber {
private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
private final Storage storage;
- private final StateManager stateManager;
private final TaskAssigner assigner;
- private final OfferManager offerManager;
private final Preemptor preemptor;
- private final BiCache<String, TaskGroupKey> reservations;
+ private final BiCache<TaskGroupKey, String> reservations;
private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
@@ -104,54 +96,16 @@ public interface TaskScheduler extends EventSubscriber {
@Inject
TaskSchedulerImpl(
Storage storage,
- StateManager stateManager,
TaskAssigner assigner,
- OfferManager offerManager,
Preemptor preemptor,
- BiCache<String, TaskGroupKey> reservations) {
+ BiCache<TaskGroupKey, String> reservations) {
this.storage = requireNonNull(storage);
- this.stateManager = requireNonNull(stateManager);
this.assigner = requireNonNull(assigner);
- this.offerManager = requireNonNull(offerManager);
this.preemptor = requireNonNull(preemptor);
this.reservations = requireNonNull(reservations);
}
- private Function<HostOffer, Assignment> getAssignerFunction(
- final MutableStoreProvider storeProvider,
- final ResourceRequest resourceRequest,
- final String taskId) {
-
- // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
- // and perform the assignment at the very end. This will allow us to use optimistic locking
- // at the top of the stack and avoid holding the write lock for too long.
- return new Function<HostOffer, Assignment>() {
- @Override
- public Assignment apply(HostOffer offer) {
- Optional<TaskGroupKey> reservation =
- reservations.get(offer.getOffer().getSlaveId().getValue());
-
- if (reservation.isPresent()) {
- if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) {
- // Slave is reserved to satisfy this task group.
- return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
- } else {
- // Slave is reserved for another task.
- return Assignment.failure();
- }
- } else {
- // Slave is not reserved.
- return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
- }
- }
- };
- }
-
- @VisibleForTesting
- static final Optional<String> LAUNCH_FAILED_MSG =
- Optional.of("Unknown exception attempting to schedule task.");
-
@Timed("task_schedule_attempt")
@Override
public boolean schedule(final String taskId) {
@@ -186,35 +140,22 @@ public interface TaskScheduler extends EventSubscriber {
} else {
ITaskConfig task = assignedTask.getTask();
AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
- try {
- boolean launched = offerManager.launchFirst(
- getAssignerFunction(store, new ResourceRequest(task, aggregate), taskId),
- TaskGroupKey.from(task));
-
- if (!launched) {
- // Task could not be scheduled.
- // TODO(maxim): Now that preemption slots are searched asynchronously, consider
- // retrying a launch attempt within the current scheduling round IFF a reservation is
- // available.
- maybePreemptFor(assignedTask, aggregate, store);
- attemptsNoMatch.incrementAndGet();
- return false;
- }
- } catch (OfferManager.LaunchException e) {
- LOG.log(Level.WARNING, "Failed to launch task.", e);
- attemptsFailed.incrementAndGet();
-
- // The attempt to schedule the task failed, so we need to backpedal on the
- // assignment.
- // It is in the LOST state and a new task will move to PENDING to replace it.
- // Should the state change fail due to storage issues, that's okay. The task will
- // time out in the ASSIGNED state and be moved to LOST.
- stateManager.changeState(
- store,
- taskId,
- Optional.of(PENDING),
- LOST,
- LAUNCH_FAILED_MSG);
+
+ boolean launched = assigner.maybeAssign(
+ store,
+ new ResourceRequest(task, aggregate),
+ TaskGroupKey.from(task),
+ taskId,
+ reservations.get(TaskGroupKey.from(task)));
+
+ if (!launched) {
+ // Task could not be scheduled.
+ // TODO(maxim): Now that preemption slots are searched asynchronously, consider
+ // retrying a launch attempt within the current scheduling round IFF a reservation is
+ // available.
+ maybePreemptFor(assignedTask, aggregate, store);
+ attemptsNoMatch.incrementAndGet();
+ return false;
}
}
@@ -226,12 +167,12 @@ public interface TaskScheduler extends EventSubscriber {
AttributeAggregate jobState,
MutableStoreProvider storeProvider) {
- if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
+ if (reservations.get(TaskGroupKey.from(task.getTask())).isPresent()) {
return;
}
Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
if (slaveId.isPresent()) {
- reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
+ reservations.put(TaskGroupKey.from(task.getTask()), slaveId.get());
}
}
@@ -240,7 +181,7 @@ public interface TaskScheduler extends EventSubscriber {
if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
if (assigned.getSlaveId() != null) {
- reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
+ reservations.remove(TaskGroupKey.from(assigned.getTask()), assigned.getSlaveId());
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 3acb45a..0e32990 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -16,20 +16,22 @@ package org.apache.aurora.scheduler.state;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.stats.Stats;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -37,161 +39,64 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.mesos.Protos.TaskInfo;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.mesos.Protos.Offer;
/**
- * Responsible for matching a task against an offer.
+ * Responsible for matching a task against an offer and launching it.
*/
public interface TaskAssigner {
-
- final class Assignment {
-
- public enum Result {
- /**
- * Assignment successful.
- */
- SUCCESS,
-
- /**
- * Assignment failed.
- */
- FAILURE,
-
- /**
- * Assignment failed with static mismatch (i.e. all {@link Veto} instances group
- * as {@link VetoGroup}).
- * @see VetoGroup#STATIC
- */
- FAILURE_STATIC_MISMATCH,
- }
-
- private static final Optional<TaskInfo> NO_TASK_INFO = Optional.absent();
- private static final ImmutableSet<Veto> NO_VETOES = ImmutableSet.of();
- private final Optional<TaskInfo> taskInfo;
- private final Set<Veto> vetoes;
-
- private Assignment(Optional<TaskInfo> taskInfo, Set<Veto> vetoes) {
- this.taskInfo = taskInfo;
- this.vetoes = vetoes;
- }
-
- /**
- * Creates a successful assignment instance.
- *
- * @param taskInfo {@link TaskInfo} to launch.
- * @return A successful {@link Assignment}.
- */
- public static Assignment success(TaskInfo taskInfo) {
- return new Assignment(Optional.of(taskInfo), NO_VETOES);
- }
-
- /**
- * Creates a failed assignment instance with a set of {@link Veto} applied.
- *
- * @param vetoes Set of {@link Veto} instances issued for the failed offer/task match.
- * @return A failed {@link Assignment}.
- */
- public static Assignment failure(Set<Veto> vetoes) {
- return new Assignment(NO_TASK_INFO, MorePreconditions.checkNotBlank(vetoes));
- }
-
- /**
- * Creates a failed assignment instance.
- *
- * @return A failed {@link Assignment}.
- */
- public static Assignment failure() {
- return new Assignment(NO_TASK_INFO, NO_VETOES);
- }
-
- /**
- * Generates the {@link Result} based on the assignment details.
- *
- * @return An assignment {@link Result}.
- */
- public Result getResult() {
- if (taskInfo.isPresent()) {
- return Result.SUCCESS;
- }
-
- return Veto.identifyGroup(vetoes) == VetoGroup.STATIC
- ? Result.FAILURE_STATIC_MISMATCH
- : Result.FAILURE;
- }
-
- /**
- * A {@link TaskInfo} to launch.
- *
- * @return Optional of {@link TaskInfo}.
- */
- public Optional<TaskInfo> getTaskInfo() {
- return taskInfo;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Assignment)) {
- return false;
- }
-
- Assignment other = (Assignment) o;
-
- return Objects.equal(taskInfo, other.taskInfo)
- && Objects.equal(vetoes, other.vetoes);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(taskInfo, vetoes);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("taskInfo", taskInfo)
- .add("vetoes", vetoes)
- .toString();
- }
- }
-
/**
- * Tries to match a task against an offer. If a match is found, the assigner should
- * make the appropriate changes to the task and provide an {@link Assignment} result.
+ * Tries to match a task against an offer. If a match is found, the assigner makes the
+ * appropriate changes to the task and requests task launch.
*
* @param storeProvider Storage provider.
- * @param offer The resource offer.
* @param resourceRequest The request for resources being scheduled.
+ * @param groupKey Task group key.
* @param taskId Task id to assign.
- * @return {@link Assignment} with assignment result.
+ * @param slaveReservation Slave reservation for a given {@code groupKey}.
+ * @return Assignment result.
*/
- Assignment maybeAssign(
+ boolean maybeAssign(
MutableStoreProvider storeProvider,
- HostOffer offer,
ResourceRequest resourceRequest,
- String taskId);
+ TaskGroupKey groupKey,
+ String taskId,
+ Optional<String> slaveReservation);
class TaskAssignerImpl implements TaskAssigner {
private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
+ @VisibleForTesting
+ static final Optional<String> LAUNCH_FAILED_MSG =
+ Optional.of("Unknown exception attempting to schedule task.");
+
+ private final AtomicLong launchFailures = Stats.exportLong("assigner_launch_failures");
+
private final StateManager stateManager;
private final SchedulingFilter filter;
private final MesosTaskFactory taskFactory;
+ private final OfferManager offerManager;
@Inject
public TaskAssignerImpl(
StateManager stateManager,
SchedulingFilter filter,
- MesosTaskFactory taskFactory) {
+ MesosTaskFactory taskFactory,
+ OfferManager offerManager) {
this.stateManager = requireNonNull(stateManager);
this.filter = requireNonNull(filter);
this.taskFactory = requireNonNull(taskFactory);
+ this.offerManager = requireNonNull(offerManager);
}
private TaskInfo assign(
@@ -225,26 +130,61 @@ public interface TaskAssigner {
}
@Override
- public Assignment maybeAssign(
+ public boolean maybeAssign(
MutableStoreProvider storeProvider,
- HostOffer offer,
ResourceRequest resourceRequest,
- String taskId) {
-
- Set<Veto> vetoes = filter.filter(
- new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
- resourceRequest);
- if (vetoes.isEmpty()) {
- return Assignment.success(assign(
- storeProvider,
- offer.getOffer(),
- resourceRequest.getRequestedPorts(),
- taskId));
- } else {
- LOG.fine("Slave " + offer.getOffer().getHostname()
- + " vetoed task " + taskId + ": " + vetoes);
- return Assignment.failure(vetoes);
+ TaskGroupKey groupKey,
+ String taskId,
+ Optional<String> slaveReservation) {
+
+ for (HostOffer offer : offerManager.getOffers(groupKey)) {
+ if (slaveReservation.isPresent()
+ && !slaveReservation.get().equals(offer.getOffer().getSlaveId().getValue())) {
+ // Task group has a slave reserved but this offer is for a different slave -> skip.
+ continue;
+ }
+ Set<Veto> vetoes = filter.filter(
+ new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
+ resourceRequest);
+ if (vetoes.isEmpty()) {
+ TaskInfo taskInfo = assign(
+ storeProvider,
+ offer.getOffer(),
+ resourceRequest.getRequestedPorts(),
+ taskId);
+
+ try {
+ offerManager.launchTask(offer.getOffer().getId(), taskInfo);
+ return true;
+ } catch (OfferManager.LaunchException e) {
+ LOG.log(Level.WARNING, "Failed to launch task.", e);
+ launchFailures.incrementAndGet();
+
+ // The attempt to schedule the task failed, so we need to backpedal on the
+ // assignment.
+ // It is in the LOST state and a new task will move to PENDING to replace it.
+ // Should the state change fail due to storage issues, that's okay. The task will
+ // time out in the ASSIGNED state and be moved to LOST.
+ stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.of(PENDING),
+ LOST,
+ LAUNCH_FAILED_MSG);
+ return false;
+ }
+ } else {
+ if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
+ // Never attempt to match this offer/groupKey pair again.
+ offerManager.banOffer(offer.getOffer().getId(), groupKey);
+ }
+
+ LOG.fine("Slave " + offer.getOffer().getHostname()
+ + " vetoed task " + taskId + ": " + vetoes);
+ return false;
+ }
}
+ return false;
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 04be32e..088a4a6 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -16,7 +16,7 @@ package org.apache.aurora.scheduler.offers;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
-import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.testing.TearDown;
@@ -31,32 +31,34 @@ import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class OfferManagerImplTest extends EasyMockTest {
private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS);
private static final String HOST_A = "HOST_A";
+ private static final IHostAttributes HOST_ATTRIBUTES_A =
+ IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_A));
private static final HostOffer OFFER_A = new HostOffer(
Offers.makeOffer("OFFER_A", HOST_A),
- IHostAttributes.build(new HostAttributes().setMode(NONE)));
+ HOST_ATTRIBUTES_A);
+ private static final Protos.OfferID OFFER_A_ID = OFFER_A.getOffer().getId();
private static final String HOST_B = "HOST_B";
private static final HostOffer OFFER_B = new HostOffer(
Offers.makeOffer("OFFER_B", HOST_B),
@@ -67,10 +69,10 @@ public class OfferManagerImplTest extends EasyMockTest {
IHostAttributes.build(new HostAttributes().setMode(NONE)));
private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(
ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name"))));
+ private static final TaskInfo TASK_INFO = TaskInfo.getDefaultInstance();
private Driver driver;
private FakeScheduledExecutor clock;
- private Function<HostOffer, Assignment> offerAcceptor;
private OfferManagerImpl offerManager;
@Before
@@ -92,7 +94,6 @@ public class OfferManagerImplTest extends EasyMockTest {
clock.assertEmpty();
}
});
- offerAcceptor = createMock(new Clazz<Function<HostOffer, Assignment>>() { });
OfferReturnDelay returnDelay = new OfferReturnDelay() {
@Override
public Amount<Long, Time> get() {
@@ -109,11 +110,9 @@ public class OfferManagerImplTest extends EasyMockTest {
HostOffer offerA = setMode(OFFER_A, DRAINING);
HostOffer offerC = setMode(OFFER_C, DRAINING);
- TaskInfo task = TaskInfo.getDefaultInstance();
- expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
- driver.launchTask(OFFER_B.getOffer().getId(), task);
+ driver.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
- driver.declineOffer(offerA.getOffer().getId());
+ driver.declineOffer(OFFER_A_ID);
driver.declineOffer(offerC.getOffer().getId());
control.replay();
@@ -121,98 +120,165 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.addOffer(offerA);
offerManager.addOffer(OFFER_B);
offerManager.addOffer(offerC);
- assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+ assertEquals(
+ ImmutableSet.of(OFFER_B, offerA, offerC),
+ ImmutableSet.copyOf(offerManager.getOffers()));
+ offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
clock.advance(RETURN_DELAY);
}
@Test
- public void testGetOffersReturnsAllOffers() throws Exception {
- expect(offerAcceptor.apply(OFFER_A))
- .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+ public void hostAttributeChangeUpdatesOfferSorting() throws Exception {
+ driver.declineOffer(OFFER_A_ID);
+ driver.declineOffer(OFFER_B.getOffer().getId());
control.replay();
+ offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A));
+
offerManager.addOffer(OFFER_A);
- assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+ offerManager.addOffer(OFFER_B);
+ assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
- offerManager.cancelOffer(OFFER_A.getOffer().getId());
- assertTrue(Iterables.isEmpty(offerManager.getOffers()));
+ HostOffer offerA = setMode(OFFER_A, DRAINING);
+ offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
+ assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getOffers()));
+
+ offerA = setMode(OFFER_A, NONE);
+ HostOffer offerB = setMode(OFFER_B, DRAINING);
+ offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
+ offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes()));
+ assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
clock.advance(RETURN_DELAY);
}
@Test
- public void testOfferFilteringDueToStaticBan() throws Exception {
- expect(offerAcceptor.apply(OFFER_A))
- .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+ public void testAddSameSlaveOffer() {
+ driver.declineOffer(OFFER_A_ID);
+ expectLastCall().times(2);
- TaskInfo task = TaskInfo.getDefaultInstance();
- expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
- driver.launchTask(OFFER_B.getOffer().getId(), task);
+ control.replay();
- driver.declineOffer(OFFER_A.getOffer().getId());
+ offerManager.addOffer(OFFER_A);
+ offerManager.addOffer(OFFER_A);
+
+ clock.advance(RETURN_DELAY);
+ }
+ @Test
+ public void testGetOffersReturnsAllOffers() throws Exception {
control.replay();
offerManager.addOffer(OFFER_A);
- assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
- // Run again to make sure all offers are banned (via no expectations set).
- assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
- // Add a new offer to accept the task previously banned for OFFER_A.
- offerManager.addOffer(OFFER_B);
- assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+ offerManager.cancelOffer(OFFER_A_ID);
+ assertTrue(Iterables.isEmpty(offerManager.getOffers()));
clock.advance(RETURN_DELAY);
}
@Test
- public void testStaticBanIsCleared() throws Exception {
- expect(offerAcceptor.apply(OFFER_A))
- .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100))));
+ public void testOfferFilteringDueToStaticBan() throws Exception {
+ driver.declineOffer(OFFER_A_ID);
- TaskInfo task = TaskInfo.getDefaultInstance();
- expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
- driver.launchTask(OFFER_A.getOffer().getId(), task);
+ control.replay();
- expect(offerAcceptor.apply(OFFER_A))
- .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining"))));
+ // Static ban ignored when now offers.
+ offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+ offerManager.addOffer(OFFER_A);
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
- expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
- driver.launchTask(OFFER_A.getOffer().getId(), task);
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
- driver.declineOffer(OFFER_A.getOffer().getId());
+ // Add static ban.
+ offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+ assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+
+ clock.advance(RETURN_DELAY);
+ }
+
+ @Test
+ public void testStaticBanIsClearedOnOfferReturn() throws Exception {
+ driver.declineOffer(OFFER_A_ID);
+ expectLastCall().times(2);
control.replay();
offerManager.addOffer(OFFER_A);
- assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+ offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+ assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
// Make sure the static ban is cleared when the offers are returned.
clock.advance(RETURN_DELAY);
offerManager.addOffer(OFFER_A);
- assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+
+ clock.advance(RETURN_DELAY);
+ }
+
+ @Test
+ public void testStaticBanIsClearedOnDriverDisconnect() throws Exception {
+ driver.declineOffer(OFFER_A_ID);
+
+ control.replay();
offerManager.addOffer(OFFER_A);
- assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+ offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+ assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
// Make sure the static ban is cleared when driver is disconnected.
offerManager.driverDisconnected(new DriverDisconnected());
offerManager.addOffer(OFFER_A);
- assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
clock.advance(RETURN_DELAY);
}
@Test
+ public void getOffer() {
+ driver.declineOffer(OFFER_A_ID);
+
+ control.replay();
+
+ offerManager.addOffer(OFFER_A);
+ assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getSlaveId()));
+ clock.advance(RETURN_DELAY);
+ }
+
+ @Test(expected = OfferManager.LaunchException.class)
+ public void testLaunchTaskDriverThrows() throws OfferManager.LaunchException {
+ driver.launchTask(OFFER_A_ID, TASK_INFO);
+ expectLastCall().andThrow(new IllegalStateException());
+
+ control.replay();
+
+ offerManager.addOffer(OFFER_A);
+
+ try {
+ offerManager.launchTask(OFFER_A_ID, TASK_INFO);
+ } finally {
+ clock.advance(RETURN_DELAY);
+ }
+ }
+
+ @Test(expected = OfferManager.LaunchException.class)
+ public void testLaunchTaskOfferRaceThrows() throws OfferManager.LaunchException {
+ control.replay();
+ offerManager.launchTask(OFFER_A_ID, TASK_INFO);
+ }
+
+ @Test
public void testFlushOffers() throws Exception {
control.replay();
offerManager.addOffer(OFFER_A);
offerManager.addOffer(OFFER_B);
offerManager.driverDisconnected(new DriverDisconnected());
- assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
clock.advance(RETURN_DELAY);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index a2e2d4c..350ec6f 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -24,10 +24,7 @@ import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.Clock;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -37,27 +34,19 @@ import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.Offers;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.Preemptor;
import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.Before;
@@ -67,10 +56,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -78,34 +65,22 @@ public class TaskSchedulerImplTest extends EasyMockTest {
private static final IScheduledTask TASK_A =
TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a"));
- private static final IScheduledTask TASK_B =
- TaskTestUtil.makeTask("b", JobKeys.from("b", "b", "b"));
- private static final HostOffer OFFER = new HostOffer(
- Offers.makeOffer("OFFER_A", "HOST_A"),
- IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
-
- private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue();
-
- private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
- private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask());
+ private static final String SLAVE_ID = "HOST_A";
+ private static final Optional<String> NO_RESERVATION = Optional.absent();
private StorageTestUtil storageUtil;
- private StateManager stateManager;
private TaskAssigner assigner;
- private OfferManager offerManager;
private TaskScheduler scheduler;
private Preemptor preemptor;
- private BiCache<String, TaskGroupKey> reservations;
+ private BiCache<TaskGroupKey, String> reservations;
private EventSink eventSink;
@Before
public void setUp() throws Exception {
storageUtil = new StorageTestUtil(this);
- stateManager = createMock(StateManager.class);
assigner = createMock(TaskAssigner.class);
- offerManager = createMock(OfferManager.class);
preemptor = createMock(Preemptor.class);
- reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
+ reservations = createMock(new Clazz<BiCache<TaskGroupKey, String>>() { });
Injector injector = getInjector(storageUtil.storage);
scheduler = injector.getInstance(TaskScheduler.class);
@@ -118,11 +93,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
new AbstractModule() {
@Override
protected void configure() {
- bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
+ bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).toInstance(reservations);
bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
bind(Preemptor.class).toInstance(preemptor);
- bind(OfferManager.class).toInstance(offerManager);
- bind(StateManager.class).toInstance(stateManager);
bind(TaskAssigner.class).toInstance(assigner);
bind(Clock.class).toInstance(createMock(Clock.class));
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
@@ -138,89 +111,100 @@ public class TaskSchedulerImplTest extends EasyMockTest {
ImmutableSet.of(task));
}
- private void expectAssigned(IScheduledTask task) {
- expect(assigner.maybeAssign(
+ private IExpectationSetters<Boolean> expectAssigned(
+ IScheduledTask task,
+ Optional<String> reservation) {
+
+ return expect(assigner.maybeAssign(
storageUtil.mutableStoreProvider,
- OFFER,
new ResourceRequest(task.getAssignedTask().getTask(), EMPTY),
- Tasks.id(task))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+ TaskGroupKey.from(task.getAssignedTask().getTask()),
+ Tasks.id(task),
+ reservation));
+ }
+
+ @Test
+ public void testSchedule() throws Exception {
+ storageUtil.expectOperations();
+
+ expectReservationCheck(TASK_A);
+ expectTaskStillPendingQuery(TASK_A);
+ expectActiveJobFetch(TASK_A);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(true);
+
+ control.replay();
+
+ assertTrue(scheduler.schedule("a"));
+ }
+
+ @Test
+ public void testScheduleNoTask() throws Exception {
+ storageUtil.expectOperations();
+ storageUtil.expectTaskFetch(
+ Query.taskScoped(Tasks.id(TASK_A)).byStatus(PENDING),
+ ImmutableSet.of());
+
+ control.replay();
+
+ assertTrue(scheduler.schedule("a"));
}
@Test
public void testReservation() throws Exception {
storageUtil.expectOperations();
+ // No reservation available in preemptor
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- expectLaunchAttempt(false);
- // Reserve "a" with offerA
- expectReservationCheck(TASK_A);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+ expectReservationCheck(TASK_A).times(2);
+ expectPreemptorCall(TASK_A, NO_RESERVATION);
+
+ // Slave is reserved.
+ expectTaskStillPendingQuery(TASK_A);
+ expectActiveJobFetch(TASK_A);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+ expectReservationCheck(TASK_A).times(2);
expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
- expectAddReservation(SLAVE_ID, TASK_A);
+ expectAddReservation(TASK_A, SLAVE_ID);
// Use previously created reservation.
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- expectGetReservation(SLAVE_ID, TASK_A);
- expectAssigned(TASK_A);
- AssignmentCapture assignment = expectLaunchAttempt(true);
+ expectGetReservation(TASK_A, SLAVE_ID);
+ expectAssigned(TASK_A, Optional.of(SLAVE_ID)).andReturn(true);
control.replay();
assertFalse(scheduler.schedule("a"));
+ assertFalse(scheduler.schedule("a"));
assertTrue(scheduler.schedule("a"));
- assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
}
@Test
- public void testReservationExpires() throws Exception {
+ public void testReservationUnusable() throws Exception {
storageUtil.expectOperations();
expectTaskStillPendingQuery(TASK_A);
expectActiveJobFetch(TASK_A);
- expectLaunchAttempt(false);
- // Reserve "a" with offerA
expectReservationCheck(TASK_A);
- expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
- expectAddReservation(SLAVE_ID, TASK_A);
-
- // First attempt -> reservation is active.
- expectTaskStillPendingQuery(TASK_B);
- expectActiveJobFetch(TASK_B);
- AssignmentCapture firstAssignment = expectLaunchAttempt(false);
- expectGetReservation(SLAVE_ID, TASK_A);
- expectReservationCheck(TASK_B);
- expectPreemptorCall(TASK_B, Optional.absent());
-
- // Status changed -> reservation removed.
- reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
-
- // Second attempt -> reservation expires.
- expectGetNoReservation(SLAVE_ID);
- expectTaskStillPendingQuery(TASK_B);
- expectActiveJobFetch(TASK_B);
- AssignmentCapture secondAssignment = expectLaunchAttempt(true);
- expectAssigned(TASK_B);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+ expectGetReservation(TASK_A, SLAVE_ID);
control.replay();
assertFalse(scheduler.schedule("a"));
- assertFalse(scheduler.schedule("b"));
- assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment);
-
- eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING));
- assertTrue(scheduler.schedule("b"));
- assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
}
@Test
- public void testReservationUnusable() throws Exception {
+ public void testReservationRemoved() throws Exception {
storageUtil.expectOperations();
expectTaskStillPendingQuery(TASK_A);
- expectLaunchAttempt(false);
- expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask())))
- .andReturn(ImmutableSet.of(SLAVE_ID));
+ expectActiveJobFetch(TASK_A);
+ expectReservationCheck(TASK_A);
+ expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+ expectGetReservation(TASK_A, SLAVE_ID);
control.replay();
@@ -236,17 +220,18 @@ public class TaskSchedulerImplTest extends EasyMockTest {
@Test
public void testPendingDeletedHandled() throws Exception {
+ reservations.remove(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()), SLAVE_ID);
+
control.replay();
- IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING));
- eventSink.post(TaskStateChange.transition(task, PENDING));
+ ScheduledTask taskBuilder = TASK_A.newBuilder().setStatus(PENDING);
+ taskBuilder.getAssignedTask().setSlaveId(SLAVE_ID);
+ eventSink.post(TaskStateChange.transition(IScheduledTask.build(taskBuilder), PENDING));
}
@Test
public void testIgnoresThrottledTasks() throws Exception {
- // Ensures that tasks in THROTTLED state are not considered part of the active job state passed
- // to the assigner function.
-
+ // Ensures that tasks in THROTTLED state are not considered part of the active job state.
Storage memStorage = DbUtil.createStorage();
Injector injector = getInjector(memStorage);
@@ -265,23 +250,31 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
});
- expectGetNoReservation(SLAVE_ID);
- AssignmentCapture assignment = expectLaunchAttempt(true);
+ expectReservationCheck(TASK_A);
expect(assigner.maybeAssign(
EasyMock.anyObject(),
- eq(OFFER),
eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)),
- eq(Tasks.id(taskA)))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+ eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())),
+ eq(Tasks.id(taskA)),
+ eq(NO_RESERVATION))).andReturn(true);
control.replay();
assertTrue(scheduler.schedule(Tasks.id(taskA)));
- assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
}
- private static class AssignmentCapture {
- public Capture<Function<HostOffer, Assignment>> assigner = createCapture();
- public Capture<TaskGroupKey> groupKey = createCapture();
+ @Test
+ public void testScheduleThrows() throws Exception {
+ storageUtil.expectOperations();
+
+ expectReservationCheck(TASK_A);
+ expectTaskStillPendingQuery(TASK_A);
+ expectActiveJobFetch(TASK_A);
+ expectAssigned(TASK_A, NO_RESERVATION).andThrow(new IllegalArgumentException("expected"));
+
+ control.replay();
+
+ assertFalse(scheduler.schedule("a"));
}
private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
@@ -291,31 +284,6 @@ public class TaskSchedulerImplTest extends EasyMockTest {
storageUtil.mutableStoreProvider)).andReturn(result);
}
- private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
- throws OfferManager.LaunchException {
-
- AssignmentCapture capture = new AssignmentCapture();
- expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey)))
- .andReturn(taskLaunched);
- return capture;
- }
-
- private IScheduledTask assign(IScheduledTask task, String slaveId) {
- ScheduledTask result = task.newBuilder();
- result.getAssignedTask().setSlaveId(slaveId);
- return IScheduledTask.build(result);
- }
-
- private void assignAndAssert(
- Result result,
- TaskGroupKey groupKey,
- HostOffer offer,
- AssignmentCapture capture) {
-
- assertEquals(result, capture.assigner.getValue().apply(offer).getResult());
- assertEquals(groupKey, capture.groupKey.getValue());
- }
-
private void expectActiveJobFetch(IScheduledTask task) {
storageUtil.expectTaskFetch(
Query.jobScoped(((Function<IScheduledTask, IJobKey>) Tasks::getJob).apply(task))
@@ -323,21 +291,17 @@ public class TaskSchedulerImplTest extends EasyMockTest {
ImmutableSet.of());
}
- private void expectAddReservation(String slaveId, IScheduledTask task) {
- reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask()));
- }
-
- private IExpectationSetters<?> expectGetReservation(String slaveId, IScheduledTask task) {
- return expect(reservations.get(slaveId))
- .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask())));
+ private void expectAddReservation(IScheduledTask task, String slaveId) {
+ reservations.put(TaskGroupKey.from(task.getAssignedTask().getTask()), slaveId);
}
- private IExpectationSetters<?> expectGetNoReservation(String slaveId) {
- return expect(reservations.get(slaveId)).andReturn(Optional.absent());
+ private IExpectationSetters<?> expectGetReservation(IScheduledTask task, String slaveId) {
+ return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask())))
+ .andReturn(Optional.of(slaveId));
}
private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
- return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
- .andReturn(ImmutableSet.of());
+ return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask())))
+ .andReturn(Optional.<String>absent());
}
}