You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/21 23:46:48 UTC
[1/2] incubator-aurora git commit: Extract a cluster state
abstraction from PreemptorImpl.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 91accd62f -> ecc3fbcac
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
new file mode 100644
index 0000000..65581ba
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -0,0 +1,666 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.Constraint;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.OfferQueue;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import static org.apache.mesos.Protos.Offer;
+import static org.apache.mesos.Protos.Resource;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class PreemptorImplTest extends EasyMockTest {
+
+ private static final String USER_A = "user_a";
+ private static final String USER_B = "user_b";
+ private static final String USER_C = "user_c";
+ private static final String JOB_A = "job_a";
+ private static final String JOB_B = "job_b";
+ private static final String JOB_C = "job_c";
+ private static final String TASK_ID_A = "task_a";
+ private static final String TASK_ID_B = "task_b";
+ private static final String TASK_ID_C = "task_c";
+ private static final String TASK_ID_D = "task_d";
+ private static final String HOST_A = "host_a";
+ private static final String RACK_A = "rackA";
+ private static final String RACK_ATTRIBUTE = "rack";
+ private static final String HOST_ATTRIBUTE = "host";
+ private static final String OFFER_A = "offer_a";
+
+ private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
+
+ private StorageTestUtil storageUtil;
+ private StateManager stateManager;
+ private SchedulingFilter schedulingFilter;
+ private FakeClock clock;
+ private StatsProvider statsProvider;
+ private OfferQueue offerQueue;
+ private AttributeAggregate emptyJob;
+
+ @Before
+ public void setUp() {
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
+ stateManager = createMock(StateManager.class);
+ clock = new FakeClock();
+ statsProvider = new FakeStatsProvider();
+ offerQueue = createMock(OfferQueue.class);
+ emptyJob = new AttributeAggregate(
+ Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+ createMock(AttributeStore.class));
+ }
+
+ private void runPreemptor(ScheduledTask pendingTask) {
+ PreemptorImpl preemptor = new PreemptorImpl(
+ storageUtil.storage,
+ stateManager,
+ offerQueue,
+ schedulingFilter,
+ PREEMPTION_DELAY,
+ clock,
+ statsProvider,
+ new LiveClusterState(storageUtil.storage));
+
+ preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
+ }
+
+ // TODO(zmanji): Put together a SchedulerPreemptorIntegrationTest as well.
+
+ private void expectGetPendingTasks(ScheduledTask... returnedTasks) {
+ Iterable<String> taskIds = FluentIterable.from(Arrays.asList(returnedTasks))
+ .transform(IScheduledTask.FROM_BUILDER)
+ .transform(Tasks.SCHEDULED_TO_ID);
+ storageUtil.expectTaskFetch(
+ Query.statusScoped(PENDING).byId(taskIds),
+ IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
+ }
+
+ private void expectGetActiveTasks(ScheduledTask... returnedTasks) {
+ storageUtil.expectTaskFetch(
+ LiveClusterState.CANDIDATE_QUERY,
+ IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
+ }
+
+ @Test
+ public void testPreempted() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
+ schedulingFilter = createMock(SchedulingFilter.class);
+ ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
+ runOnHost(lowPriority, HOST_A);
+
+ ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(highPriority);
+ expectGetActiveTasks(lowPriority);
+
+ expectFiltering();
+ expectPreempted(lowPriority);
+
+ control.replay();
+ runPreemptor(highPriority);
+ }
+
+ @Test
+ public void testLowestPriorityPreempted() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
+ schedulingFilter = createMock(SchedulingFilter.class);
+ ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
+ runOnHost(lowPriority, HOST_A);
+
+ ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
+ runOnHost(lowerPriority, HOST_A);
+
+ ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(highPriority);
+ expectGetActiveTasks(lowerPriority, lowerPriority);
+
+ expectFiltering();
+ expectPreempted(lowerPriority);
+
+ control.replay();
+ runPreemptor(highPriority);
+ }
+
+ @Test
+ public void testOnePreemptableTask() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
+ schedulingFilter = createMock(SchedulingFilter.class);
+ ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
+ runOnHost(highPriority, HOST_A);
+
+ ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
+ runOnHost(lowerPriority, HOST_A);
+
+ ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
+ runOnHost(lowestPriority, HOST_A);
+
+ ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(pendingPriority);
+ expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
+
+ expectFiltering();
+ expectPreempted(lowestPriority);
+
+ control.replay();
+ runPreemptor(pendingPriority);
+ }
+
+ @Test
+ public void testHigherPriorityRunning() throws Exception {
+ schedulingFilter = createMock(SchedulingFilter.class);
+ ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
+ runOnHost(highPriority, HOST_A);
+
+ ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(task);
+ expectGetActiveTasks(highPriority);
+
+ control.replay();
+ runPreemptor(task);
+ }
+
+ @Test
+ public void testProductionPreemptingNonproduction() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
+ schedulingFilter = createMock(SchedulingFilter.class);
+ // Use a very low priority for the production task to show that priority is irrelevant.
+ ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
+ runOnHost(a1, HOST_A);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(p1);
+ expectGetActiveTasks(a1);
+
+ expectFiltering();
+ expectPreempted(a1);
+
+ control.replay();
+ runPreemptor(p1);
+ }
+
+ @Test
+ public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
+ schedulingFilter = createMock(SchedulingFilter.class);
+ // Use a very low priority for the production task to show that priority is irrelevant.
+ ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
+ ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
+ runOnHost(a1, HOST_A);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(p1);
+ expectGetActiveTasks(a1);
+
+ expectFiltering();
+ expectPreempted(a1);
+
+ control.replay();
+ runPreemptor(p1);
+ }
+
+ @Test
+ public void testProductionUsersDoNotPreemptEachOther() throws Exception {
+ schedulingFilter = createMock(SchedulingFilter.class);
+ ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
+ ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
+ runOnHost(a1, HOST_A);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(p1);
+ expectGetActiveTasks(a1);
+
+ control.replay();
+ runPreemptor(p1);
+ }
+
+ // Ensures a production task can preempt 2 tasks on the same host.
+ @Test
+ public void testProductionPreemptingManyNonProduction() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl();
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+ ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
+ b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+ setUpHost(HOST_A, RACK_A);
+
+ runOnHost(a1, HOST_A);
+ runOnHost(b1, HOST_A);
+
+ ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(p1);
+ expectGetActiveTasks(a1, b1);
+
+ expectPreempted(a1);
+ expectPreempted(b1);
+
+ control.replay();
+ runPreemptor(p1);
+ }
+
+ // Ensures we select the minimal number of tasks to preempt
+ @Test
+ public void testMinimalSetPreempted() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl();
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
+
+ ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
+ b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+ ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
+ b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+ setUpHost(HOST_A, RACK_A);
+
+ runOnHost(a1, HOST_A);
+ runOnHost(b1, HOST_A);
+ runOnHost(b2, HOST_A);
+
+ ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetPendingTasks(p1);
+ expectGetActiveTasks(b1, b2, a1);
+
+ expectPreempted(a1);
+
+ control.replay();
+ runPreemptor(p1);
+ }
+
+ // Ensures a production task *never* preempts a production task from another job.
+ @Test
+ public void testProductionJobNeverPreemptsProductionJob() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl();
+ ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+ setUpHost(HOST_A, RACK_A);
+
+ runOnHost(p1, HOST_A);
+
+ ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2");
+ p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectNoOffers();
+
+ expectGetActiveTasks(p1);
+ expectGetPendingTasks(p2);
+
+ control.replay();
+ runPreemptor(p2);
+ }
+
+ // Ensures that we can preempt if a task + offer can satisfy a pending task.
+ @Test
+ public void testPreemptWithOfferAndTask() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl();
+
+ setUpHost(HOST_A, RACK_A);
+
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+ runOnHost(a1, HOST_A);
+
+ Offer o1 = makeOffer(OFFER_A, HOST_A, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1);
+ expectOffers(o1);
+
+ ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectGetActiveTasks(a1);
+ expectGetPendingTasks(p1);
+
+ expectPreempted(a1);
+
+ control.replay();
+ runPreemptor(p1);
+ }
+
+ // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
+ @Test
+ public void testPreemptWithOfferAndMultipleTasks() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl();
+
+ setUpHost(HOST_A, RACK_A);
+
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+ runOnHost(a1, HOST_A);
+
+ ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
+ a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+ runOnHost(a2, HOST_A);
+
+ Offer o1 = makeOffer(OFFER_A, HOST_A, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
+ expectOffers(o1);
+
+ ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectGetActiveTasks(a1, a2);
+ expectGetPendingTasks(p1);
+
+ expectPreempted(a1);
+ expectPreempted(a2);
+
+ control.replay();
+ runPreemptor(p1);
+ }
+
+ // Ensures we don't preempt if a host has enough slack to satisfy a pending task.
+ @Test
+ public void testPreemptWithLargeOffer() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl();
+
+ setUpHost(HOST_A, RACK_A);
+
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+ runOnHost(a1, HOST_A);
+
+ Offer o1 = makeOffer(OFFER_A, HOST_A, 2, Amount.of(2048L, Data.MB), Amount.of(1L, Data.MB), 1);
+ expectOffers(o1);
+
+ ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
+
+ clock.advance(PREEMPTION_DELAY);
+
+ expectGetActiveTasks(a1);
+ expectGetPendingTasks(p1);
+
+ control.replay();
+ runPreemptor(p1);
+ }
+
+ @Test
+ public void testIgnoresThrottledTasks() throws Exception {
+ // Ensures that the preemptor does not consider a throttled task to be a preemption candidate.
+ schedulingFilter = createMock(SchedulingFilter.class);
+
+ Storage storage = MemStorage.newEmptyStorage();
+
+ final ScheduledTask throttled = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1").setStatus(THROTTLED);
+ throttled.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+ final ScheduledTask pending = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+ pending.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
+
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider store) {
+ store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
+ IScheduledTask.build(pending),
+ IScheduledTask.build(throttled)));
+ }
+ });
+
+ clock.advance(PREEMPTION_DELAY);
+
+ control.replay();
+
+ PreemptorImpl preemptor = new PreemptorImpl(
+ storage,
+ stateManager,
+ offerQueue,
+ schedulingFilter,
+ PREEMPTION_DELAY,
+ clock,
+ statsProvider,
+ new LiveClusterState(storage));
+
+ assertEquals(
+ Optional.<String>absent(),
+ preemptor.findPreemptionSlotFor(pending.getAssignedTask().getTaskId(), emptyJob));
+ }
+
+ // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails.
+
+ private Offer makeOffer(String offerId,
+ String host,
+ double cpu,
+ Amount<Long, Data> ram,
+ Amount<Long, Data> disk,
+ int numPorts) {
+ List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList();
+ Offer.Builder builder = Offer.newBuilder();
+ builder.getIdBuilder().setValue(offerId);
+ builder.getFrameworkIdBuilder().setValue("framework-id");
+ builder.getSlaveIdBuilder().setValue(hostToId(host));
+ builder.setHostname(host);
+ for (Resource r: resources) {
+ builder.addResources(r);
+ }
+ return builder.build();
+ }
+
+ private void expectOffers(Offer ... offers) {
+ Iterable<HostOffer> hostOffers = FluentIterable.from(Lists.newArrayList(offers))
+ .transform(new Function<Offer, HostOffer>() {
+ @Override
+ public HostOffer apply(Offer offer) {
+ return new HostOffer(
+ offer,
+ IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
+ }
+ });
+ expect(offerQueue.getOffers()).andReturn(hostOffers);
+ }
+
+ private void expectNoOffers() {
+ expect(offerQueue.getOffers()).andReturn(ImmutableList.<HostOffer>of());
+ }
+
+ private IExpectationSetters<Set<Veto>> expectFiltering() {
+ return expect(schedulingFilter.filter(
+ EasyMock.<UnusedResource>anyObject(),
+ EasyMock.<ResourceRequest>anyObject()))
+ .andAnswer(
+ new IAnswer<Set<Veto>>() {
+ @Override
+ public Set<Veto> answer() {
+ return ImmutableSet.of();
+ }
+ });
+ }
+
+ private void expectPreempted(ScheduledTask preempted) throws Exception {
+ expect(stateManager.changeState(
+ eq(storageUtil.mutableStoreProvider),
+ eq(Tasks.id(preempted)),
+ eq(Optional.<ScheduleStatus>absent()),
+ eq(ScheduleStatus.PREEMPTING),
+ EasyMock.<Optional<String>>anyObject()))
+ .andReturn(true);
+ }
+
+ private ScheduledTask makeProductionTask(String role, String job, String taskId) {
+ return makeTask(role, job, taskId, 0, "prod", true);
+ }
+
+ private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) {
+ return makeTask(role, job, taskId, priority, "prod", true);
+ }
+
+ private ScheduledTask makeTask(String role, String job, String taskId, int priority,
+ String env, boolean production) {
+ AssignedTask assignedTask = new AssignedTask()
+ .setTaskId(taskId)
+ .setTask(new TaskConfig()
+ .setJob(new JobKey(role, env, job))
+ .setOwner(new Identity(role, role))
+ .setPriority(priority)
+ .setProduction(production)
+ .setJobName(job)
+ .setEnvironment(env)
+ .setConstraints(new HashSet<Constraint>()));
+ ScheduledTask scheduledTask = new ScheduledTask()
+ .setStatus(PENDING)
+ .setAssignedTask(assignedTask);
+ addEvent(scheduledTask, PENDING);
+ return scheduledTask;
+ }
+
+ private ScheduledTask makeTask(String role, String job, String taskId) {
+ return makeTask(role, job, taskId, 0, "dev", false);
+ }
+
+ private ScheduledTask makeTask(String role, String job, String taskId, int priority) {
+ return makeTask(role, job, taskId, priority, "dev", false);
+ }
+
+ private void addEvent(ScheduledTask task, ScheduleStatus status) {
+ task.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
+ }
+
+ // Slave Hostname to a slave id
+ private String hostToId(String hostname) {
+ return hostname + "_id";
+ }
+
+ private void runOnHost(ScheduledTask task, String host) {
+ task.setStatus(RUNNING);
+ addEvent(task, RUNNING);
+ task.getAssignedTask().setSlaveHost(host);
+ task.getAssignedTask().setSlaveId(hostToId(host));
+ }
+
+ private Attribute host(String host) {
+ return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host));
+ }
+
+ private Attribute rack(String rack) {
+ return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack));
+ }
+
+ // Sets up a normal host, no dedicated hosts and no maintenance.
+ private void setUpHost(String host, String rack) {
+ IHostAttributes hostAttrs = IHostAttributes.build(
+ new HostAttributes().setHost(host).setSlaveId(host + "_id")
+ .setMode(NONE).setAttributes(ImmutableSet.of(rack(rack), host(host))));
+
+ expect(this.storageUtil.attributeStore.getHostAttributes(host))
+ .andReturn(Optional.of(hostAttrs)).anyTimes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
new file mode 100644
index 0000000..020b671
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.twitter.common.application.StartupStage;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PreemptorModuleTest extends EasyMockTest {
+
+ private StorageTestUtil storageUtil;
+
+ @Before
+ public void setUp() {
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
+ }
+
+ private Injector createInjector(Module module) {
+ return Guice.createInjector(
+ module,
+ new LifecycleModule(),
+ new AbstractModule() {
+ private <T> void bindMock(Class<T> clazz) {
+ bind(clazz).toInstance(createMock(clazz));
+ }
+
+ @Override
+ protected void configure() {
+ bindMock(SchedulingFilter.class);
+ bindMock(StateManager.class);
+ bindMock(TaskAssigner.class);
+ bindMock(Thread.UncaughtExceptionHandler.class);
+ bind(Storage.class).toInstance(storageUtil.storage);
+ }
+ });
+ }
+
+ @Test
+ public void testPreemptorDisabled() throws Exception {
+ Injector injector = createInjector(new PreemptorModule(false));
+
+ Supplier<ImmutableSet<IScheduledTask>> taskSupplier =
+ createMock(new EasyMockTest.Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
+ AttributeStore attributeStore = createMock(AttributeStore.class);
+
+ control.replay();
+
+ injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
+
+ injector.getBindings();
+ assertEquals(
+ Optional.<String>absent(),
+ injector.getInstance(Preemptor.class)
+ .findPreemptionSlotFor("a", new AttributeAggregate(taskSupplier, attributeStore)));
+ }
+}
[2/2] incubator-aurora git commit: Extract a cluster state
abstraction from PreemptorImpl.
Posted by wf...@apache.org.
Extract a cluster state abstraction from PreemptorImpl.
Bugs closed: AURORA-121
Reviewed at https://reviews.apache.org/r/28310/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ecc3fbca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ecc3fbca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ecc3fbca
Branch: refs/heads/master
Commit: ecc3fbcacd5497c9da3695f1d04677a542958137
Parents: 91accd6
Author: Bill Farner <wf...@apache.org>
Authored: Fri Nov 21 14:42:28 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Nov 21 14:42:28 2014 -0800
----------------------------------------------------------------------
.../apache/aurora/scheduler/app/AppModule.java | 2 +
.../aurora/scheduler/async/AsyncModule.java | 61 +-
.../aurora/scheduler/async/Preemptor.java | 434 ------------
.../aurora/scheduler/async/TaskScheduler.java | 1 +
.../scheduler/async/preemptor/ClusterState.java | 34 +
.../async/preemptor/LiveClusterState.java | 68 ++
.../scheduler/async/preemptor/Preemptor.java | 396 +++++++++++
.../async/preemptor/PreemptorModule.java | 84 +++
.../aurora/scheduler/async/AsyncModuleTest.java | 31 +-
.../scheduler/async/PreemptorImplTest.java | 663 ------------------
.../scheduler/async/TaskSchedulerImplTest.java | 5 +-
.../scheduler/async/TaskSchedulerTest.java | 1 +
.../async/preemptor/LiveClusterStateTest.java | 76 +++
.../async/preemptor/PreemptorImplTest.java | 666 +++++++++++++++++++
.../async/preemptor/PreemptorModuleTest.java | 90 +++
15 files changed, 1426 insertions(+), 1186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 6f1cf47..360e161 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -47,6 +47,7 @@ import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.scheduler.SchedulerModule;
import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.async.AsyncModule;
+import org.apache.aurora.scheduler.async.preemptor.PreemptorModule;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.http.JettyServerModule;
@@ -118,6 +119,7 @@ public class AppModule extends AbstractModule {
install(new MetadataModule());
install(new QuotaModule());
install(new JettyServerModule());
+ install(new PreemptorModule());
install(new SchedulerDriverModule());
install(new SchedulerServicesModule());
install(new SchedulerModule());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 03cbe24..7f2c760 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -31,7 +31,6 @@ import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.RateLimiter;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
-import com.google.inject.Key;
import com.google.inject.PrivateModule;
import com.google.inject.TypeLiteral;
import com.twitter.common.args.Arg;
@@ -56,7 +55,6 @@ import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSetting
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
@@ -64,8 +62,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
-import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
/**
@@ -152,15 +148,6 @@ public class AsyncModule extends AbstractModule {
private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
Arg.create(Amount.of(30, Time.SECONDS));
- @CmdLine(name = "preemption_delay",
- help = "Time interval after which a pending task becomes eligible to preempt other tasks")
- private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
- Arg.create(Amount.of(10L, Time.MINUTES));
-
- @CmdLine(name = "enable_preemptor",
- help = "Enable the preemptor and preemption")
- private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
-
@CmdLine(name = "job_update_history_per_job_threshold",
help = "Maximum number of completed job updates to retain in a job update history.")
private static final Arg<Integer> JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD = Arg.create(10);
@@ -181,28 +168,11 @@ public class AsyncModule extends AbstractModule {
private static final Arg<Amount<Long, Time>> INITIAL_TASK_KILL_RETRY_INTERVAL =
Arg.create(Amount.of(5L, Time.SECONDS));
- private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
- @Override
- public Optional<String> findPreemptionSlotFor(
- String taskId,
- AttributeAggregate attributeAggregate) {
-
- return Optional.absent();
- }
- };
-
@CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while "
+ "trying to satisfy a task preempting another.")
private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
Arg.create(Amount.of(3L, Time.MINUTES));
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- private @interface PreemptionBinding { }
-
- @VisibleForTesting
- static final Key<Preemptor> PREEMPTOR_KEY = Key.get(Preemptor.class, PreemptionBinding.class);
-
@CmdLine(name = "executor_gc_interval",
help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
@@ -215,17 +185,6 @@ public class AsyncModule extends AbstractModule {
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
private @interface AsyncExecutor { }
- private final boolean enablePreemptor;
-
- @VisibleForTesting
- AsyncModule(boolean enablePreemptor) {
- this.enablePreemptor = enablePreemptor;
- }
-
- public AsyncModule() {
- this(ENABLE_PREEMPTOR.get());
- }
-
@VisibleForTesting
static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size";
@@ -274,22 +233,11 @@ public class AsyncModule extends AbstractModule {
bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
expose(RescheduleCalculator.class);
- if (enablePreemptor) {
- bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
- bind(PreemptorImpl.class).in(Singleton.class);
- LOG.info("Preemptor Enabled.");
- } else {
- bind(PREEMPTOR_KEY).toInstance(NULL_PREEMPTOR);
- LOG.warning("Preemptor Disabled.");
- }
- expose(PREEMPTOR_KEY);
- bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
- .toInstance(PREEMPTION_DELAY.get());
bind(TaskGroups.class).in(Singleton.class);
expose(TaskGroups.class);
}
});
- bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
+ bindTaskScheduler(binder(), RESERVATION_DURATION.get());
PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
install(new PrivateModule() {
@@ -390,15 +338,10 @@ public class AsyncModule extends AbstractModule {
* well with the MultiBinder that backs the PubSub system.
*/
@VisibleForTesting
- static void bindTaskScheduler(
- Binder binder,
- final Key<Preemptor> preemptorKey,
- final Amount<Long, Time> reservationDuration) {
-
+ static void bindTaskScheduler(Binder binder, final Amount<Long, Time> reservationDuration) {
binder.install(new PrivateModule() {
@Override
protected void configure() {
- bind(Preemptor.class).to(preemptorKey);
bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(ReservationDuration.class)
.toInstance(reservationDuration);
bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
deleted file mode 100644
index 97d5d13..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import static org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import static org.apache.aurora.scheduler.storage.Storage.Work;
-
-/**
- * Preempts active tasks in favor of higher priority tasks.
- */
-public interface Preemptor {
-
- /**
- * Preempts active tasks in favor of the input task.
- *
- * @param taskId ID of the preempting task.
- * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
- * @return ID of the slave where preemption occured.
- */
- Optional<String> findPreemptionSlotFor(String taskId, AttributeAggregate attributeAggregate);
-
- /**
- * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
- * priority than tasks that are currently running.
- *
- * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
- * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt other
- * tasks.
- */
- class PreemptorImpl implements Preemptor {
-
- /**
- * Binding annotation for the time interval after which a pending task becomes eligible to
- * preempt other tasks.
- */
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- @interface PreemptionDelay { }
-
- @VisibleForTesting
- static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
- EnumSet.copyOf(Sets.difference(Tasks.SLAVE_ASSIGNED_STATES, EnumSet.of(PREEMPTING))));
-
- private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
- // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
- private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
- // Incremented every time we fail to find tasks to preempt for a pending task.
- private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
-
- private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
- @Override
- public boolean apply(IScheduledTask task) {
- return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
- >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
- }
- };
-
- private final Storage storage;
- private final StateManager stateManager;
- private final OfferQueue offerQueue;
- private final SchedulingFilter schedulingFilter;
- private final Amount<Long, Time> preemptionCandidacyDelay;
- private final Clock clock;
- private final AtomicLong missingAttributes;
-
- /**
- * Creates a new preemptor.
- *
- * @param storage Backing store for tasks.
- * @param stateManager Scheduler state controller to instruct when preempting tasks.
- * @param offerQueue Queue that contains available Mesos resource offers.
- * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
- * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
- * tasks.
- * @param clock Clock to check current time.
- */
- @Inject
- PreemptorImpl(
- Storage storage,
- StateManager stateManager,
- OfferQueue offerQueue,
- SchedulingFilter schedulingFilter,
- @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
- Clock clock,
- StatsProvider statsProvider) {
-
- this.storage = requireNonNull(storage);
- this.stateManager = requireNonNull(stateManager);
- this.offerQueue = requireNonNull(offerQueue);
- this.schedulingFilter = requireNonNull(schedulingFilter);
- this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
- this.clock = requireNonNull(clock);
- missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes");
- }
-
- private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
- return Lists.newArrayList(Iterables.transform(Iterables.filter(
- Storage.Util.consistentFetchTasks(storage, query), filter),
- SCHEDULED_TO_ASSIGNED));
- }
-
- private List<IAssignedTask> fetch(Query.Builder query) {
- return fetch(query, Predicates.<IScheduledTask>alwaysTrue());
- }
-
- private static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
- new Function<IAssignedTask, String>() {
- @Override
- public String apply(IAssignedTask input) {
- return input.getSlaveId();
- }
- };
-
- private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
- new Function<IAssignedTask, ResourceSlot>() {
- @Override
- public ResourceSlot apply(IAssignedTask task) {
- return ResourceSlot.from(task.getTask());
- }
- };
-
- private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
- new Function<HostOffer, ResourceSlot>() {
- @Override
- public ResourceSlot apply(HostOffer offer) {
- return ResourceSlot.from(offer.getOffer());
- }
- };
-
- private static final Function<HostOffer, String> OFFER_TO_HOST =
- new Function<HostOffer, String>() {
- @Override
- public String apply(HostOffer offer) {
- return offer.getOffer().getHostname();
- }
- };
-
- private static final Function<HostOffer, IHostAttributes> OFFER_TO_ATTRIBUTES =
- new Function<HostOffer, IHostAttributes>() {
- @Override
- public IHostAttributes apply(HostOffer offer) {
- return offer.getAttributes();
- }
- };
-
- // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
- // ordering
- private static final Ordering<IAssignedTask> RESOURCE_ORDER =
- ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
-
- /**
- * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
- * The empty set indicates the offers (slack) are enough.
- * A set with elements indicates those tasks and the offers are enough.
- */
- private Optional<Set<IAssignedTask>> getTasksToPreempt(
- Iterable<IAssignedTask> possibleVictims,
- Iterable<HostOffer> offers,
- IAssignedTask pendingTask,
- AttributeAggregate jobState) {
-
- // This enforces the precondition that all of the resources are from the same host. We need to
- // get the host for the schedulingFilter.
- Set<String> hosts = ImmutableSet.<String>builder()
- .addAll(Iterables.transform(possibleVictims, Tasks.ASSIGNED_TO_SLAVE_HOST))
- .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
-
- String host = Iterables.getOnlyElement(hosts);
-
- ResourceSlot slackResources =
- ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
-
- if (!Iterables.isEmpty(offers)) {
- if (Iterables.size(offers) > 1) {
- // There are multiple offers for the same host. Since both have maintenance information
- // we don't preempt with this information and wait for mesos to merge the two offers for
- // us.
- return Optional.absent();
- }
- IHostAttributes attributes = Iterables.getOnlyElement(
- FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
-
- Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
- new UnusedResource(slackResources, attributes),
- new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
-
- if (vetoes.isEmpty()) {
- return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
- }
- }
-
- FluentIterable<IAssignedTask> preemptableTasks = FluentIterable.from(possibleVictims)
- .filter(Predicates.compose(
- preemptionFilter(pendingTask.getTask()),
- Tasks.ASSIGNED_TO_INFO));
-
- if (preemptableTasks.isEmpty()) {
- return Optional.absent();
- }
-
- List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
-
- Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
-
- for (IAssignedTask victim : sortedVictims) {
- toPreemptTasks.add(victim);
-
- ResourceSlot totalResource = ResourceSlot.sum(
- ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
- slackResources);
-
- Optional<IHostAttributes> attributes = getHostAttributes(host);
- if (!attributes.isPresent()) {
- missingAttributes.incrementAndGet();
- continue;
- }
-
- Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
- new UnusedResource(totalResource, attributes.get()),
- new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
-
- if (vetoes.isEmpty()) {
- return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
- }
- }
- return Optional.absent();
- }
-
- private Optional<IHostAttributes> getHostAttributes(final String host) {
- return storage.weaklyConsistentRead(new Work.Quiet<Optional<IHostAttributes>>() {
- @Override
- public Optional<IHostAttributes> apply(StoreProvider storeProvider) {
- return storeProvider.getAttributeStore().getHostAttributes(host);
- }
- });
- }
-
- private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
- new Function<HostOffer, String>() {
- @Override
- public String apply(HostOffer offer) {
- return offer.getOffer().getSlaveId().getValue();
- }
- };
-
- /**
- * Order by production flag (true, then false), subsorting by task ID.
- * TODO(wfarner): Re-evaluate - what do we gain from sorting by task ID?
- */
- private static final Ordering<IAssignedTask> SCHEDULING_ORDER =
- Ordering.explicit(true, false)
- .onResultOf(Functions.compose(
- Functions.forPredicate(Tasks.IS_PRODUCTION),
- Tasks.ASSIGNED_TO_INFO))
- .compound(Ordering.natural().onResultOf(Tasks.ASSIGNED_TO_ID));
-
- private Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
- // Only non-pending active tasks may be preempted.
- List<IAssignedTask> activeTasks = fetch(CANDIDATE_QUERY);
-
- // Walk through the preemption candidates in reverse scheduling order.
- Collections.sort(activeTasks, SCHEDULING_ORDER.reverse());
-
- // Group the tasks by slave id so they can be paired with offers from the same slave.
- return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
- }
-
- @Override
- public synchronized Optional<String> findPreemptionSlotFor(
- String taskId,
- AttributeAggregate attributeAggregate) {
-
- List<IAssignedTask> pendingTasks =
- fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
-
- // Task is no longer PENDING no need to preempt
- if (pendingTasks.isEmpty()) {
- return Optional.absent();
- }
-
- final IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
-
- Multimap<String, IAssignedTask> slavesToActiveTasks = getSlavesToActiveTasks();
-
- if (slavesToActiveTasks.isEmpty()) {
- return Optional.absent();
- }
-
- attemptedPreemptions.incrementAndGet();
-
- // Group the offers by slave id so they can be paired with active tasks from the same slave.
- Multimap<String, HostOffer> slavesToOffers =
- Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
-
- Set<String> allSlaves = ImmutableSet.<String>builder()
- .addAll(slavesToOffers.keySet())
- .addAll(slavesToActiveTasks.keySet())
- .build();
-
- for (String slaveID : allSlaves) {
- final Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
- slavesToActiveTasks.get(slaveID),
- slavesToOffers.get(slaveID),
- pendingTask,
- attributeAggregate);
-
- if (toPreemptTasks.isPresent()) {
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- for (IAssignedTask toPreempt : toPreemptTasks.get()) {
- stateManager.changeState(
- storeProvider,
- toPreempt.getTaskId(),
- Optional.<ScheduleStatus>absent(),
- PREEMPTING,
- Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
- tasksPreempted.incrementAndGet();
- }
- }
- });
- return Optional.of(slaveID);
- }
- }
-
- noSlotsFound.incrementAndGet();
- return Optional.absent();
- }
-
- /**
- * Creates a static filter that will identify tasks that may preempt the provided task.
- * A task may preempt another task if the following conditions hold true:
- * - The resources reserved for {@code preemptableTask} are sufficient to satisfy the task.
- * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
- * OR {@code preemptableTask} is non-production and the compared task is production.
- *
- * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
- * @return A filter that will compare the priorities and resources required by other tasks
- * with {@code preemptableTask}.
- */
- private static Predicate<ITaskConfig> preemptionFilter(final ITaskConfig pendingTask) {
- return new Predicate<ITaskConfig>() {
- @Override
- public boolean apply(ITaskConfig possibleVictim) {
- boolean pendingIsProduction = pendingTask.isProduction();
- boolean victimIsProduction = possibleVictim.isProduction();
-
- if (pendingIsProduction && !victimIsProduction) {
- return true;
- } else if (pendingIsProduction == victimIsProduction) {
- // If production flags are equal, preemption is based on priority within the same role.
- if (pendingTask.getJob().getRole().equals(possibleVictim.getJob().getRole())) {
- return pendingTask.getPriority() > possibleVictim.getPriority();
- } else {
- return false;
- }
- } else {
- return false;
- }
- }
- };
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 626545a..ead9d28 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -42,6 +42,7 @@ import com.twitter.common.stats.StatsProvider;
import com.twitter.common.util.Clock;
import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
new file mode 100644
index 0000000..4f0019a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.Multimap;
+
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+/**
+ * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
+ */
+public interface ClusterState {
+
+ /**
+ * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are
+ * assigned to.
+ * <p>
+ * TODO(wfarner): Return a more minimal type than IAssignedTask here.
+ *
+ * @return Active tasks and their associated slave IDs.
+ */
+ Multimap<String, IAssignedTask> getSlavesToActiveTasks();
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
new file mode 100644
index 0000000..0da4d2a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.EnumSet;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+
+class LiveClusterState implements ClusterState {
+ @VisibleForTesting
+ static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
+ new Function<IAssignedTask, String>() {
+ @Override
+ public String apply(IAssignedTask input) {
+ return input.getSlaveId();
+ }
+ };
+
+ @VisibleForTesting
+ static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
+ EnumSet.copyOf(Sets.difference(Tasks.SLAVE_ASSIGNED_STATES, EnumSet.of(PREEMPTING))));
+
+ private final Storage storage;
+
+ @Inject
+ LiveClusterState(Storage storage) {
+ this.storage = requireNonNull(storage);
+ }
+
+ @Override
+ public Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
+ // Only non-pending active tasks may be preempted.
+ Iterable<IAssignedTask> activeTasks = Iterables.transform(
+ Storage.Util.consistentFetchTasks(storage, CANDIDATE_QUERY),
+ SCHEDULED_TO_ASSIGNED);
+
+ // Group the tasks by slave id so they can be paired with offers from the same slave.
+ return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
new file mode 100644
index 0000000..afbd645
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.async.OfferQueue;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.Util;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+
+/**
+ * Preempts active tasks in favor of higher priority tasks.
+ */
+public interface Preemptor {
+
+ /**
+ * Preempts active tasks in favor of the input task.
+ *
+ * @param taskId ID of the preempting task.
+ * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
+ * @return ID of the slave where preemption occured.
+ */
+ Optional<String> findPreemptionSlotFor(String taskId, AttributeAggregate attributeAggregate);
+
+ /**
+ * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
+ * priority than tasks that are currently running.
+ *
+ * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
+ * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt other
+ * tasks.
+ * <p>
+ * TODO(wfarner): Move this class out of the interface to make it package private.
+ */
+ class PreemptorImpl implements Preemptor {
+
+ /**
+ * Binding annotation for the time interval after which a pending task becomes eligible to
+ * preempt other tasks.
+ */
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ @interface PreemptionDelay { }
+
+ private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
+ // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
+ private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
+ // Incremented every time we fail to find tasks to preempt for a pending task.
+ private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
+
+ private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
+ @Override
+ public boolean apply(IScheduledTask task) {
+ return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
+ >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
+ }
+ };
+
+ private final Storage storage;
+ private final StateManager stateManager;
+ private final OfferQueue offerQueue;
+ private final SchedulingFilter schedulingFilter;
+ private final Amount<Long, Time> preemptionCandidacyDelay;
+ private final Clock clock;
+ private final AtomicLong missingAttributes;
+ private final ClusterState clusterState;
+
+ /**
+ * Creates a new preemptor.
+ *
+ * @param storage Backing store for tasks.
+ * @param stateManager Scheduler state controller to instruct when preempting tasks.
+ * @param offerQueue Queue that contains available Mesos resource offers.
+ * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
+ * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
+ * tasks.
+ * @param clock Clock to check current time.
+ */
+ @Inject
+ public PreemptorImpl(
+ Storage storage,
+ StateManager stateManager,
+ OfferQueue offerQueue,
+ SchedulingFilter schedulingFilter,
+ @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
+ Clock clock,
+ StatsProvider statsProvider,
+ ClusterState clusterState) {
+
+ this.storage = requireNonNull(storage);
+ this.stateManager = requireNonNull(stateManager);
+ this.offerQueue = requireNonNull(offerQueue);
+ this.schedulingFilter = requireNonNull(schedulingFilter);
+ this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
+ this.clock = requireNonNull(clock);
+ missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes");
+ this.clusterState = requireNonNull(clusterState);
+ }
+
+ private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
+ new Function<IAssignedTask, ResourceSlot>() {
+ @Override
+ public ResourceSlot apply(IAssignedTask task) {
+ return ResourceSlot.from(task.getTask());
+ }
+ };
+
+ private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
+ new Function<HostOffer, ResourceSlot>() {
+ @Override
+ public ResourceSlot apply(HostOffer offer) {
+ return ResourceSlot.from(offer.getOffer());
+ }
+ };
+
+ private static final Function<HostOffer, String> OFFER_TO_HOST =
+ new Function<HostOffer, String>() {
+ @Override
+ public String apply(HostOffer offer) {
+ return offer.getOffer().getHostname();
+ }
+ };
+
+ private static final Function<HostOffer, IHostAttributes> OFFER_TO_ATTRIBUTES =
+ new Function<HostOffer, IHostAttributes>() {
+ @Override
+ public IHostAttributes apply(HostOffer offer) {
+ return offer.getAttributes();
+ }
+ };
+
+ // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
+ // ordering
+ private static final Ordering<IAssignedTask> RESOURCE_ORDER =
+ ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
+
+ /**
+ * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
+ * The empty set indicates the offers (slack) are enough.
+ * A set with elements indicates those tasks and the offers are enough.
+ */
+ private Optional<Set<String>> getTasksToPreempt(
+ Iterable<IAssignedTask> possibleVictims,
+ Iterable<HostOffer> offers,
+ IAssignedTask pendingTask,
+ AttributeAggregate jobState) {
+
+ // This enforces the precondition that all of the resources are from the same host. We need to
+ // get the host for the schedulingFilter.
+ Set<String> hosts = ImmutableSet.<String>builder()
+ .addAll(Iterables.transform(possibleVictims, Tasks.ASSIGNED_TO_SLAVE_HOST))
+ .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
+
+ String host = Iterables.getOnlyElement(hosts);
+
+ ResourceSlot slackResources =
+ ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
+
+ if (!Iterables.isEmpty(offers)) {
+ if (Iterables.size(offers) > 1) {
+ // There are multiple offers for the same host. Since both have maintenance information
+ // we don't preempt with this information and wait for mesos to merge the two offers for
+ // us.
+ return Optional.absent();
+ }
+ IHostAttributes attributes = Iterables.getOnlyElement(
+ FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
+
+ Set<Veto> vetoes = schedulingFilter.filter(
+ new UnusedResource(slackResources, attributes),
+ new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
+
+ if (vetoes.isEmpty()) {
+ return Optional.<Set<String>>of(ImmutableSet.<String>of());
+ }
+ }
+
+ FluentIterable<IAssignedTask> preemptableTasks = FluentIterable.from(possibleVictims)
+ .filter(Predicates.compose(
+ preemptionFilter(pendingTask.getTask()),
+ Tasks.ASSIGNED_TO_INFO));
+
+ if (preemptableTasks.isEmpty()) {
+ return Optional.absent();
+ }
+
+ List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
+
+ Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
+
+ for (IAssignedTask victim : sortedVictims) {
+ toPreemptTasks.add(victim);
+
+ ResourceSlot totalResource = ResourceSlot.sum(
+ ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
+ slackResources);
+
+ Optional<IHostAttributes> attributes = getHostAttributes(host);
+ if (!attributes.isPresent()) {
+ missingAttributes.incrementAndGet();
+ continue;
+ }
+
+ Set<Veto> vetoes = schedulingFilter.filter(
+ new UnusedResource(totalResource, attributes.get()),
+ new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
+
+ if (vetoes.isEmpty()) {
+ Set<String> taskIds =
+ FluentIterable.from(toPreemptTasks).transform(Tasks.ASSIGNED_TO_ID).toSet();
+ return Optional.of(taskIds);
+ }
+ }
+ return Optional.absent();
+ }
+
+ private Optional<IHostAttributes> getHostAttributes(final String host) {
+ return storage.weaklyConsistentRead(new Storage.Work.Quiet<Optional<IHostAttributes>>() {
+ @Override
+ public Optional<IHostAttributes> apply(Storage.StoreProvider storeProvider) {
+ return storeProvider.getAttributeStore().getHostAttributes(host);
+ }
+ });
+ }
+
+ private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
+ new Function<HostOffer, String>() {
+ @Override
+ public String apply(HostOffer offer) {
+ return offer.getOffer().getSlaveId().getValue();
+ }
+ };
+
+ private Optional<IAssignedTask> fetchIdlePendingTask(String taskId) {
+ Query.Builder query = Query.taskScoped(taskId).byStatus(PENDING);
+ Iterable<IAssignedTask> result = FluentIterable
+ .from(Util.consistentFetchTasks(storage, query))
+ .filter(isIdleTask)
+ .transform(SCHEDULED_TO_ASSIGNED);
+ return Optional.fromNullable(Iterables.getOnlyElement(result, null));
+ }
+
+ @Override
+ public synchronized Optional<String> findPreemptionSlotFor(
+ final String taskId,
+ AttributeAggregate attributeAggregate) {
+
+ final Optional<IAssignedTask> pendingTask = fetchIdlePendingTask(taskId);
+
+ // Task is no longer PENDING no need to preempt.
+ if (!pendingTask.isPresent()) {
+ return Optional.absent();
+ }
+
+ Multimap<String, IAssignedTask> slavesToActiveTasks = clusterState.getSlavesToActiveTasks();
+
+ if (slavesToActiveTasks.isEmpty()) {
+ return Optional.absent();
+ }
+
+ attemptedPreemptions.incrementAndGet();
+
+ // Group the offers by slave id so they can be paired with active tasks from the same slave.
+ Multimap<String, HostOffer> slavesToOffers =
+ Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
+
+ Set<String> allSlaves = ImmutableSet.<String>builder()
+ .addAll(slavesToOffers.keySet())
+ .addAll(slavesToActiveTasks.keySet())
+ .build();
+
+ for (String slaveID : allSlaves) {
+ final Optional<Set<String>> toPreemptTasks = getTasksToPreempt(
+ slavesToActiveTasks.get(slaveID),
+ slavesToOffers.get(slaveID),
+ pendingTask.get(),
+ attributeAggregate);
+
+ if (toPreemptTasks.isPresent()) {
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ for (String toPreempt : toPreemptTasks.get()) {
+ stateManager.changeState(
+ storeProvider,
+ toPreempt,
+ Optional.<ScheduleStatus>absent(),
+ PREEMPTING,
+ Optional.of("Preempting in favor of " + taskId));
+ tasksPreempted.incrementAndGet();
+ }
+ }
+ });
+ return Optional.of(slaveID);
+ }
+ }
+
+ noSlotsFound.incrementAndGet();
+ return Optional.absent();
+ }
+
+ /**
+ * Creates a static filter that will identify tasks that may preempt the provided task.
+ * A task may preempt another task if the following conditions hold true:
+ * - The resources reserved for {@code preemptableTask} are sufficient to satisfy the task.
+ * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
+ * OR {@code preemptableTask} is non-production and the compared task is production.
+ *
+ * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
+ * @return A filter that will compare the priorities and resources required by other tasks
+ * with {@code preemptableTask}.
+ */
+ private static Predicate<ITaskConfig> preemptionFilter(final ITaskConfig pendingTask) {
+ return new Predicate<ITaskConfig>() {
+ @Override
+ public boolean apply(ITaskConfig possibleVictim) {
+ boolean pendingIsProduction = pendingTask.isProduction();
+ boolean victimIsProduction = possibleVictim.isProduction();
+
+ if (pendingIsProduction && !victimIsProduction) {
+ return true;
+ } else if (pendingIsProduction == victimIsProduction) {
+ // If production flags are equal, preemption is based on priority within the same role.
+ if (pendingTask.getJob().getRole().equals(possibleVictim.getJob().getRole())) {
+ return pendingTask.getPriority() > possibleVictim.getPriority();
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
new file mode 100644
index 0000000..bc96b67
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl.PreemptionDelay;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+
+public class PreemptorModule extends PrivateModule {
+
+ private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName());
+
+ @CmdLine(name = "enable_preemptor",
+ help = "Enable the preemptor and preemption")
+ private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
+
+ @CmdLine(name = "preemption_delay",
+ help = "Time interval after which a pending task becomes eligible to preempt other tasks")
+ private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
+ Arg.create(Amount.of(10L, Time.MINUTES));
+
+ private final boolean enablePreemptor;
+
+ @VisibleForTesting
+ PreemptorModule(boolean enablePreemptor) {
+ this.enablePreemptor = enablePreemptor;
+ }
+
+ public PreemptorModule() {
+ this(ENABLE_PREEMPTOR.get());
+ }
+
+ @Override
+ protected void configure() {
+ if (enablePreemptor) {
+ bind(Preemptor.class).to(PreemptorImpl.class);
+ bind(PreemptorImpl.class).in(Singleton.class);
+ bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
+ .toInstance(PREEMPTION_DELAY.get());
+ bind(ClusterState.class).to(LiveClusterState.class);
+ bind(LiveClusterState.class).in(Singleton.class);
+ LOG.info("Preemptor Enabled.");
+ } else {
+ bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
+ LOG.warning("Preemptor Disabled.");
+ }
+
+ expose(Preemptor.class);
+ }
+
+ private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
+ @Override
+ public Optional<String> findPreemptionSlotFor(
+ String taskId,
+ AttributeAggregate attributeAggregate) {
+
+ return Optional.absent();
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
index e007c30..4ed6b15 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
@@ -15,10 +15,7 @@ package org.apache.aurora.scheduler.async;
import java.util.Set;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Service;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -26,23 +23,19 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
-import com.twitter.common.application.StartupStage;
import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.base.ExceptionalCommand;
import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.Clock;
import org.apache.aurora.scheduler.AppStartup;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.junit.Before;
@@ -83,6 +76,7 @@ public class AsyncModuleTest extends EasyMockTest {
bindMock(Driver.class);
bindMock(SchedulingFilter.class);
bindMock(MaintenanceController.class);
+ bindMock(Preemptor.class);
bindMock(StateManager.class);
bindMock(TaskAssigner.class);
bindMock(Thread.UncaughtExceptionHandler.class);
@@ -93,7 +87,7 @@ public class AsyncModuleTest extends EasyMockTest {
@Test
public void testBindings() throws Exception {
- Injector injector = createInjector(new AsyncModule(true));
+ Injector injector = createInjector(new AsyncModule());
control.replay();
@@ -110,23 +104,4 @@ public class AsyncModuleTest extends EasyMockTest {
statsProvider.getAllValues()
);
}
-
- @Test
- public void testPreemptorDisabled() throws Exception {
- Injector injector = createInjector(new AsyncModule(false));
-
- Supplier<ImmutableSet<IScheduledTask>> taskSupplier =
- createMock(new Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
- AttributeStore attributeStore = createMock(AttributeStore.class);
-
- control.replay();
-
- injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
-
- injector.getBindings();
- assertEquals(
- Optional.<String>absent(),
- injector.getInstance(AsyncModule.PREEMPTOR_KEY)
- .findPreemptionSlotFor("a", new AttributeAggregate(taskSupplier, attributeStore)));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
deleted file mode 100644
index 69108cf..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.Constraint;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
-import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import static org.apache.mesos.Protos.Offer;
-import static org.apache.mesos.Protos.Resource;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class PreemptorImplTest extends EasyMockTest {
-
- private static final String USER_A = "user_a";
- private static final String USER_B = "user_b";
- private static final String USER_C = "user_c";
- private static final String JOB_A = "job_a";
- private static final String JOB_B = "job_b";
- private static final String JOB_C = "job_c";
- private static final String TASK_ID_A = "task_a";
- private static final String TASK_ID_B = "task_b";
- private static final String TASK_ID_C = "task_c";
- private static final String TASK_ID_D = "task_d";
- private static final String HOST_A = "host_a";
- private static final String RACK_A = "rackA";
- private static final String RACK_ATTRIBUTE = "rack";
- private static final String HOST_ATTRIBUTE = "host";
- private static final String OFFER_A = "offer_a";
-
- private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
-
- private StorageTestUtil storageUtil;
- private StateManager stateManager;
- private SchedulingFilter schedulingFilter;
- private FakeClock clock;
- private StatsProvider statsProvider;
- private OfferQueue offerQueue;
- private AttributeAggregate emptyJob;
-
- @Before
- public void setUp() {
- storageUtil = new StorageTestUtil(this);
- storageUtil.expectOperations();
- stateManager = createMock(StateManager.class);
- clock = new FakeClock();
- statsProvider = new FakeStatsProvider();
- offerQueue = createMock(OfferQueue.class);
- emptyJob = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- createMock(AttributeStore.class));
- }
-
- private void runPreemptor(ScheduledTask pendingTask) {
- PreemptorImpl preemptor = new PreemptorImpl(
- storageUtil.storage,
- stateManager,
- offerQueue,
- schedulingFilter,
- PREEMPTION_DELAY,
- clock,
- statsProvider);
-
- preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
- }
-
- // TODO(zmanji): Put together a SchedulerPreemptorIntegrationTest as well.
-
- private void expectGetPendingTasks(ScheduledTask... returnedTasks) {
- Iterable<String> taskIds = FluentIterable.from(Arrays.asList(returnedTasks))
- .transform(IScheduledTask.FROM_BUILDER)
- .transform(Tasks.SCHEDULED_TO_ID);
- storageUtil.expectTaskFetch(
- Query.statusScoped(PENDING).byId(taskIds),
- IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
- }
-
- private void expectGetActiveTasks(ScheduledTask... returnedTasks) {
- storageUtil.expectTaskFetch(
- PreemptorImpl.CANDIDATE_QUERY,
- IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
- }
-
- @Test
- public void testPreempted() throws Exception {
- setUpHost(HOST_A, RACK_A);
-
- schedulingFilter = createMock(SchedulingFilter.class);
- ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
- runOnHost(lowPriority, HOST_A);
-
- ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(highPriority);
- expectGetActiveTasks(lowPriority);
-
- expectFiltering();
- expectPreempted(lowPriority);
-
- control.replay();
- runPreemptor(highPriority);
- }
-
- @Test
- public void testLowestPriorityPreempted() throws Exception {
- setUpHost(HOST_A, RACK_A);
-
- schedulingFilter = createMock(SchedulingFilter.class);
- ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
- runOnHost(lowPriority, HOST_A);
-
- ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
- runOnHost(lowerPriority, HOST_A);
-
- ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(highPriority);
- expectGetActiveTasks(lowerPriority, lowerPriority);
-
- expectFiltering();
- expectPreempted(lowerPriority);
-
- control.replay();
- runPreemptor(highPriority);
- }
-
- @Test
- public void testOnePreemptableTask() throws Exception {
- setUpHost(HOST_A, RACK_A);
-
- schedulingFilter = createMock(SchedulingFilter.class);
- ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
- runOnHost(highPriority, HOST_A);
-
- ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
- runOnHost(lowerPriority, HOST_A);
-
- ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
- runOnHost(lowestPriority, HOST_A);
-
- ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(pendingPriority);
- expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
-
- expectFiltering();
- expectPreempted(lowestPriority);
-
- control.replay();
- runPreemptor(pendingPriority);
- }
-
- @Test
- public void testHigherPriorityRunning() throws Exception {
- schedulingFilter = createMock(SchedulingFilter.class);
- ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
- runOnHost(highPriority, HOST_A);
-
- ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(task);
- expectGetActiveTasks(highPriority);
-
- control.replay();
- runPreemptor(task);
- }
-
- @Test
- public void testProductionPreemptingNonproduction() throws Exception {
- setUpHost(HOST_A, RACK_A);
-
- schedulingFilter = createMock(SchedulingFilter.class);
- // Use a very low priority for the production task to show that priority is irrelevant.
- ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
- ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
- runOnHost(a1, HOST_A);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(p1);
- expectGetActiveTasks(a1);
-
- expectFiltering();
- expectPreempted(a1);
-
- control.replay();
- runPreemptor(p1);
- }
-
- @Test
- public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
- setUpHost(HOST_A, RACK_A);
-
- schedulingFilter = createMock(SchedulingFilter.class);
- // Use a very low priority for the production task to show that priority is irrelevant.
- ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
- ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
- runOnHost(a1, HOST_A);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(p1);
- expectGetActiveTasks(a1);
-
- expectFiltering();
- expectPreempted(a1);
-
- control.replay();
- runPreemptor(p1);
- }
-
- @Test
- public void testProductionUsersDoNotPreemptEachOther() throws Exception {
- schedulingFilter = createMock(SchedulingFilter.class);
- ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
- ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
- runOnHost(a1, HOST_A);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(p1);
- expectGetActiveTasks(a1);
-
- control.replay();
- runPreemptor(p1);
- }
-
- // Ensures a production task can preempt 2 tasks on the same host.
- @Test
- public void testProductionPreemptingManyNonProduction() throws Exception {
- schedulingFilter = new SchedulingFilterImpl();
- ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
- a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
- ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
- b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
- setUpHost(HOST_A, RACK_A);
-
- runOnHost(a1, HOST_A);
- runOnHost(b1, HOST_A);
-
- ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
- p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(p1);
- expectGetActiveTasks(a1, b1);
-
- expectPreempted(a1);
- expectPreempted(b1);
-
- control.replay();
- runPreemptor(p1);
- }
-
- // Ensures we select the minimal number of tasks to preempt
- @Test
- public void testMinimalSetPreempted() throws Exception {
- schedulingFilter = new SchedulingFilterImpl();
- ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
- a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
-
- ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
- b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
- ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
- b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
- setUpHost(HOST_A, RACK_A);
-
- runOnHost(a1, HOST_A);
- runOnHost(b1, HOST_A);
- runOnHost(b2, HOST_A);
-
- ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
- p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetPendingTasks(p1);
- expectGetActiveTasks(b1, b2, a1);
-
- expectPreempted(a1);
-
- control.replay();
- runPreemptor(p1);
- }
-
- // Ensures a production task *never* preempts a production task from another job.
- @Test
- public void testProductionJobNeverPreemptsProductionJob() throws Exception {
- schedulingFilter = new SchedulingFilterImpl();
- ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
- p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
- setUpHost(HOST_A, RACK_A);
-
- runOnHost(p1, HOST_A);
-
- ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2");
- p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectNoOffers();
-
- expectGetActiveTasks(p1);
- expectGetPendingTasks(p2);
-
- control.replay();
- runPreemptor(p2);
- }
-
- // Ensures that we can preempt if a task + offer can satisfy a pending task.
- @Test
- public void testPreemptWithOfferAndTask() throws Exception {
- schedulingFilter = new SchedulingFilterImpl();
-
- setUpHost(HOST_A, RACK_A);
-
- ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
- a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
- runOnHost(a1, HOST_A);
-
- Offer o1 = makeOffer(OFFER_A, HOST_A, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1);
- expectOffers(o1);
-
- ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
- p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectGetActiveTasks(a1);
- expectGetPendingTasks(p1);
-
- expectPreempted(a1);
-
- control.replay();
- runPreemptor(p1);
- }
-
- // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
- @Test
- public void testPreemptWithOfferAndMultipleTasks() throws Exception {
- schedulingFilter = new SchedulingFilterImpl();
-
- setUpHost(HOST_A, RACK_A);
-
- ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
- a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
- runOnHost(a1, HOST_A);
-
- ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
- a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
- runOnHost(a2, HOST_A);
-
- Offer o1 = makeOffer(OFFER_A, HOST_A, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
- expectOffers(o1);
-
- ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
- p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectGetActiveTasks(a1, a2);
- expectGetPendingTasks(p1);
-
- expectPreempted(a1);
- expectPreempted(a2);
-
- control.replay();
- runPreemptor(p1);
- }
-
- // Ensures we don't preempt if a host has enough slack to satisfy a pending task.
- @Test
- public void testPreemptWithLargeOffer() throws Exception {
- schedulingFilter = new SchedulingFilterImpl();
-
- setUpHost(HOST_A, RACK_A);
-
- ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
- a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
- runOnHost(a1, HOST_A);
-
- Offer o1 = makeOffer(OFFER_A, HOST_A, 2, Amount.of(2048L, Data.MB), Amount.of(1L, Data.MB), 1);
- expectOffers(o1);
-
- ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
- p1.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
-
- clock.advance(PREEMPTION_DELAY);
-
- expectGetActiveTasks(a1);
- expectGetPendingTasks(p1);
-
- control.replay();
- runPreemptor(p1);
- }
-
- @Test
- public void testIgnoresThrottledTasks() throws Exception {
- // Ensures that the preemptor does not consider a throttled task to be a preemption candidate.
- schedulingFilter = createMock(SchedulingFilter.class);
-
- Storage storage = MemStorage.newEmptyStorage();
-
- final ScheduledTask throttled = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1").setStatus(THROTTLED);
- throttled.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
- final ScheduledTask pending = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
- pending.getAssignedTask().getTask().setNumCpus(1).setRamMb(1024);
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider store) {
- store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
- IScheduledTask.build(pending),
- IScheduledTask.build(throttled)));
- }
- });
-
- clock.advance(PREEMPTION_DELAY);
-
- control.replay();
-
- PreemptorImpl preemptor = new PreemptorImpl(
- storage,
- stateManager,
- offerQueue,
- schedulingFilter,
- PREEMPTION_DELAY,
- clock,
- statsProvider);
-
- assertEquals(
- Optional.<String>absent(),
- preemptor.findPreemptionSlotFor(pending.getAssignedTask().getTaskId(), emptyJob));
- }
-
- // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails.
-
- private Offer makeOffer(String offerId,
- String host,
- double cpu,
- Amount<Long, Data> ram,
- Amount<Long, Data> disk,
- int numPorts) {
- List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList();
- Offer.Builder builder = Offer.newBuilder();
- builder.getIdBuilder().setValue(offerId);
- builder.getFrameworkIdBuilder().setValue("framework-id");
- builder.getSlaveIdBuilder().setValue(hostToId(host));
- builder.setHostname(host);
- for (Resource r: resources) {
- builder.addResources(r);
- }
- return builder.build();
- }
-
- private void expectOffers(Offer ... offers) {
- Iterable<HostOffer> hostOffers = FluentIterable.from(Lists.newArrayList(offers))
- .transform(new Function<Offer, HostOffer>() {
- @Override
- public HostOffer apply(Offer offer) {
- return new HostOffer(
- offer,
- IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
- }
- });
- expect(offerQueue.getOffers()).andReturn(hostOffers);
- }
-
- private void expectNoOffers() {
- expect(offerQueue.getOffers()).andReturn(ImmutableList.<HostOffer>of());
- }
-
- private IExpectationSetters<Set<Veto>> expectFiltering() {
- return expect(schedulingFilter.filter(
- EasyMock.<UnusedResource>anyObject(),
- EasyMock.<ResourceRequest>anyObject()))
- .andAnswer(
- new IAnswer<Set<Veto>>() {
- @Override
- public Set<Veto> answer() {
- return ImmutableSet.of();
- }
- });
- }
-
- private void expectPreempted(ScheduledTask preempted) throws Exception {
- expect(stateManager.changeState(
- eq(storageUtil.mutableStoreProvider),
- eq(Tasks.id(preempted)),
- eq(Optional.<ScheduleStatus>absent()),
- eq(ScheduleStatus.PREEMPTING),
- EasyMock.<Optional<String>>anyObject()))
- .andReturn(true);
- }
-
- private ScheduledTask makeProductionTask(String role, String job, String taskId) {
- return makeTask(role, job, taskId, 0, "prod", true);
- }
-
- private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) {
- return makeTask(role, job, taskId, priority, "prod", true);
- }
-
- private ScheduledTask makeTask(String role, String job, String taskId, int priority,
- String env, boolean production) {
- AssignedTask assignedTask = new AssignedTask()
- .setTaskId(taskId)
- .setTask(new TaskConfig()
- .setJob(new JobKey(role, env, job))
- .setOwner(new Identity(role, role))
- .setPriority(priority)
- .setProduction(production)
- .setJobName(job)
- .setEnvironment(env)
- .setConstraints(new HashSet<Constraint>()));
- ScheduledTask scheduledTask = new ScheduledTask()
- .setStatus(PENDING)
- .setAssignedTask(assignedTask);
- addEvent(scheduledTask, PENDING);
- return scheduledTask;
- }
-
- private ScheduledTask makeTask(String role, String job, String taskId) {
- return makeTask(role, job, taskId, 0, "dev", false);
- }
-
- private ScheduledTask makeTask(String role, String job, String taskId, int priority) {
- return makeTask(role, job, taskId, priority, "dev", false);
- }
-
- private void addEvent(ScheduledTask task, ScheduleStatus status) {
- task.addToTaskEvents(new TaskEvent(clock.nowMillis(), status));
- }
-
- // Slave Hostname to a slave id
- private String hostToId(String hostname) {
- return hostname + "_id";
- }
-
- private void runOnHost(ScheduledTask task, String host) {
- task.setStatus(RUNNING);
- addEvent(task, RUNNING);
- task.getAssignedTask().setSlaveHost(host);
- task.getAssignedTask().setSlaveId(hostToId(host));
- }
-
- private Attribute host(String host) {
- return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host));
- }
-
- private Attribute rack(String rack) {
- return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack));
- }
-
- // Sets up a normal host, no dedicated hosts and no maintenance.
- private void setUpHost(String host, String rack) {
- IHostAttributes hostAttrs = IHostAttributes.build(
- new HostAttributes().setHost(host).setSlaveId(host + "_id")
- .setMode(NONE).setAttributes(ImmutableSet.of(rack(rack), host(host))));
-
- expect(this.storageUtil.attributeStore.getHostAttributes(host))
- .andReturn(Optional.of(hostAttrs)).anyTimes();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 17f2d77..5647349 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -37,6 +37,7 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
@@ -116,8 +117,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
new AbstractModule() {
@Override
protected void configure() {
- bind(AsyncModule.PREEMPTOR_KEY).toInstance(preemptor);
- AsyncModule.bindTaskScheduler(binder(), AsyncModule.PREEMPTOR_KEY, reservationDuration);
+ bind(Preemptor.class).toInstance(preemptor);
+ AsyncModule.bindTaskScheduler(binder(), reservationDuration);
bind(OfferQueue.class).toInstance(offerQueue);
bind(StateManager.class).toInstance(stateManager);
bind(TaskAssigner.class).toInstance(assigner);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 012804a..6cc1323 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -45,6 +45,7 @@ import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ecc3fbca/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
new file mode 100644
index 0000000..8f91ff6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LiveClusterStateTest extends EasyMockTest {
+
+ private StorageTestUtil storageUtil;
+ private ClusterState clusterState;
+
+ @Before
+ public void setUp() {
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
+ clusterState = new LiveClusterState(storageUtil.storage);
+ }
+
+ @Test
+ public void testEmptyStorage() {
+ storageUtil.expectTaskFetch(LiveClusterState.CANDIDATE_QUERY);
+
+ control.replay();
+
+ assertEquals(
+ ImmutableMultimap.<String, IAssignedTask>of(),
+ clusterState.getSlavesToActiveTasks());
+ }
+
+ private IScheduledTask makeTask(String taskId, String slaveId) {
+ return IScheduledTask.build(new ScheduledTask()
+ .setAssignedTask(new AssignedTask()
+ .setTaskId(taskId)
+ .setSlaveId(slaveId)));
+ }
+
+ @Test
+ public void testGetActiveTasks() {
+ IScheduledTask a = makeTask("a", "1");
+ IScheduledTask b = makeTask("b", "1");
+ IScheduledTask c = makeTask("c", "2");
+
+ storageUtil.expectTaskFetch(LiveClusterState.CANDIDATE_QUERY, a, b, c);
+
+ control.replay();
+
+ assertEquals(
+ ImmutableMultimap.<String, IAssignedTask>builder()
+ .putAll("1", a.getAssignedTask(), b.getAssignedTask())
+ .putAll("2", c.getAssignedTask())
+ .build(),
+ clusterState.getSlavesToActiveTasks());
+ }
+}