You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2018/01/09 22:51:09 UTC
[1/2] aurora git commit: Refactor scheduling code to split matching
and assigning phases
Repository: aurora
Updated Branches:
refs/heads/master 5b34231ba -> 4e6242fed
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index 348386e..b3ffb0d 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -39,13 +39,13 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.gen.apiConstants;
-import org.apache.aurora.scheduler.TierInfo;
-import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.mesos.TaskExecutors;
import org.apache.aurora.scheduler.offers.HostOffer;
+import org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceTestUtil;
import org.apache.aurora.scheduler.resources.ResourceType;
@@ -67,9 +67,6 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.apache.aurora.gen.Resource.numCpus;
import static org.apache.aurora.gen.Resource.ramMb;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.PREFERRED_TIER;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl.ORDER;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
@@ -110,7 +107,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
private SchedulingFilter schedulingFilter;
private FakeStatsProvider statsProvider;
private PreemptorMetrics preemptorMetrics;
- private TierManager tierManager;
private FakeClock clock;
@Before
@@ -119,7 +115,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
storageUtil.expectOperations();
statsProvider = new FakeStatsProvider();
preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
- tierManager = createMock(TierManager.class);
clock = new FakeClock();
ResourceType.initializeEmptyCliArgsForTest();
}
@@ -129,12 +124,11 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
Optional<HostOffer> offer,
ScheduledTask... victims) {
- PreemptionVictimFilter.PreemptionVictimFilterImpl filter =
- new PreemptionVictimFilter.PreemptionVictimFilterImpl(
- schedulingFilter,
- TaskExecutors.NO_OVERHEAD_EXECUTOR,
- preemptorMetrics,
- tierManager);
+ PreemptionVictimFilterImpl filter = new PreemptionVictimFilterImpl(
+ schedulingFilter,
+ TaskExecutors.NO_OVERHEAD_EXECUTOR,
+ preemptorMetrics,
+ TaskTestUtil.TIER_MANAGER);
return filter.filterPreemptionVictims(
ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
@@ -145,16 +139,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
}
@Test
- public void testPreempted() throws Exception {
+ public void testPreempted() {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
assignToHost(lowPriority);
- expectGetTier(lowPriority, DEV_TIER).times(2);
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
- expectGetTier(highPriority, DEV_TIER);
expectFiltering();
@@ -163,7 +155,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
}
@Test
- public void testLowestPriorityPreempted() throws Exception {
+ public void testLowestPriorityPreempted() {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
@@ -172,10 +164,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
assignToHost(lowerPriority);
- expectGetTier(lowerPriority, DEV_TIER).atLeastOnce();
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
- expectGetTier(highPriority, DEV_TIER);
expectFiltering();
@@ -184,24 +174,20 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
}
@Test
- public void testOnePreemptableTask() throws Exception {
+ public void testOnePreemptableTask() {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
assignToHost(highPriority);
- expectGetTier(highPriority, DEV_TIER);
ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
assignToHost(lowerPriority);
- expectGetTier(lowerPriority, DEV_TIER);
ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
assignToHost(lowestPriority);
- expectGetTier(lowestPriority, DEV_TIER).times(2);
ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
- expectGetTier(pendingPriority, DEV_TIER).times(3);
expectFiltering();
@@ -212,29 +198,25 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
}
@Test
- public void testHigherPriorityRunning() throws Exception {
+ public void testHigherPriorityRunning() {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
assignToHost(highPriority);
- expectGetTier(highPriority, DEV_TIER);
ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
- expectGetTier(task, DEV_TIER);
control.replay();
assertNoVictims(runFilter(task, NO_OFFER, highPriority));
}
@Test
- public void testProductionPreemptingNonproduction() throws Exception {
+ public void testProductionPreemptingNonproduction() {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
// Use a very low priority for the production task to show that priority is irrelevant.
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
- expectGetTier(p1, PREFERRED_TIER);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
- expectGetTier(a1, DEV_TIER).times(2);
assignToHost(a1);
expectFiltering();
@@ -244,16 +226,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
}
@Test
- public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+ public void testProductionPreemptingNonproductionAcrossUsers() {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
// Use a very low priority for the production task to show that priority is irrelevant.
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
- expectGetTier(p1, PREFERRED_TIER);
ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
assignToHost(a1);
- expectGetTier(a1, DEV_TIER).times(2);
expectFiltering();
@@ -262,12 +242,10 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
}
@Test
- public void testProductionUsersDoNotPreemptEachOther() throws Exception {
+ public void testProductionUsersDoNotPreemptEachOther() {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
- expectGetTier(p1, PREFERRED_TIER);
ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
- expectGetTier(a1, PREFERRED_TIER);
assignToHost(a1);
control.replay();
@@ -276,17 +254,15 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
// Ensures a production task can preempt 2 tasks on the same host.
@Test
- public void testProductionPreemptingManyNonProduction() throws Exception {
+ public void testProductionPreemptingManyNonProduction() {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
- expectGetTier(a1, DEV_TIER).atLeastOnce();
ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
setResource(b1, CPUS, 1.0);
setResource(b1, RAM_MB, 512.0);
- expectGetTier(b1, DEV_TIER).atLeastOnce();
setUpHost();
@@ -296,7 +272,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
- expectGetTier(p1, PREFERRED_TIER).times(2);
control.replay();
assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1);
@@ -304,12 +279,11 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
// Ensures we select the minimal number of tasks to preempt
@Test
- public void testMinimalSetPreempted() throws Exception {
+ public void testMinimalSetPreempted() {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 4.0);
setResource(a1, RAM_MB, 4096.0);
- expectGetTier(a1, DEV_TIER).atLeastOnce();
ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
b1.getAssignedTask().getTask()
@@ -318,12 +292,10 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
ramMb(512)));
setResource(b1, CPUS, 1.0);
setResource(b1, RAM_MB, 512.0);
- expectGetTier(b1, DEV_TIER).anyTimes();
ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
setResource(b2, CPUS, 1.0);
setResource(b2, RAM_MB, 512.0);
- expectGetTier(b2, DEV_TIER).anyTimes();
setUpHost();
@@ -334,7 +306,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
- expectGetTier(p1, PREFERRED_TIER).times(3);
control.replay();
assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1);
@@ -342,14 +313,13 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
// Ensures a production task *never* preempts a production task from another job.
@Test
- public void testProductionJobNeverPreemptsProductionJob() throws Exception {
+ public void testProductionJobNeverPreemptsProductionJob() {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
p1.getAssignedTask().getTask()
.setResources(ImmutableSet.of(
numCpus(2),
ramMb(1024)));
- expectGetTier(p1, PREFERRED_TIER);
setUpHost();
@@ -360,7 +330,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
.setResources(ImmutableSet.of(
numCpus(1),
ramMb(512)));
- expectGetTier(p2, PREFERRED_TIER);
control.replay();
assertNoVictims(runFilter(p2, NO_OFFER, p1));
@@ -368,7 +337,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
// Ensures that we can preempt if a task + offer can satisfy a pending task.
@Test
- public void testPreemptWithOfferAndTask() throws Exception {
+ public void testPreemptWithOfferAndTask() {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
@@ -379,14 +348,12 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
numCpus(1),
ramMb(512)));
assignToHost(a1);
- expectGetTier(a1, DEV_TIER).times(2);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
p1.getAssignedTask().getTask()
.setResources(ImmutableSet.of(
numCpus(2),
ramMb(1024)));
- expectGetTier(p1, PREFERRED_TIER);
control.replay();
assertVictims(
@@ -399,7 +366,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
// Ensures revocable offer resources are filtered out.
@Test
- public void testRevocableOfferFiltered() throws Exception {
+ public void testRevocableOfferFiltered() {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
@@ -408,12 +375,10 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
- expectGetTier(a1, DEV_TIER).times(2);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
- expectGetTier(p1, PREFERRED_TIER);
control.replay();
assertNoVictims(runFilter(
@@ -424,21 +389,20 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
// Ensures revocable task CPU is not considered for preemption.
@Test
- public void testRevocableVictimsFiltered() throws Exception {
+ public void testRevocableVictimsFiltered() {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setTier(TaskTestUtil.REVOCABLE_TIER_NAME);
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
- expectGetTier(a1, REVOCABLE_TIER).times(2);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
- expectGetTier(p1, PREFERRED_TIER);
control.replay();
assertNoVictims(runFilter(
@@ -449,7 +413,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
// Ensures revocable victim non-compressible resources are still considered.
@Test
- public void testRevocableVictimRamUsed() throws Exception {
+ public void testRevocableVictimRamUsed() {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
@@ -458,12 +422,10 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
- expectGetTier(a1, REVOCABLE_TIER).times(2);
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 2.0);
setResource(p1, RAM_MB, 1024.0);
- expectGetTier(p1, PREFERRED_TIER);
control.replay();
assertVictims(
@@ -476,7 +438,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
// Ensures we can preempt if two tasks and an offer can satisfy a pending task.
@Test
- public void testPreemptWithOfferAndMultipleTasks() throws Exception {
+ public void testPreemptWithOfferAndMultipleTasks() {
schedulingFilter = new SchedulingFilterImpl(UNAVAILABLITY_THRESHOLD, clock);
setUpHost();
@@ -485,18 +447,15 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
- expectGetTier(a1, DEV_TIER).atLeastOnce();
ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
setResource(a2, CPUS, 1.0);
setResource(a2, RAM_MB, 512.0);
assignToHost(a2);
- expectGetTier(a2, DEV_TIER).atLeastOnce();
ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
setResource(p1, CPUS, 4.0);
setResource(p1, RAM_MB, 2048.0);
- expectGetTier(p1, PREFERRED_TIER).times(2);
control.replay();
Optional<HostOffer> offer =
@@ -519,13 +478,11 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
assignToHost(task);
- expectGetTier(task, PREFERRED_TIER);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
- expectGetTier(a1, DEV_TIER);
expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.empty());
@@ -540,13 +497,11 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
assignToHost(task);
- expectGetTier(task, PREFERRED_TIER);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
setResource(a1, CPUS, 1.0);
setResource(a1, RAM_MB, 512.0);
assignToHost(a1);
- expectGetTier(a1, DEV_TIER).times(2);
setUpHost();
expectFiltering(Optional.of(Veto.constraintMismatch("ban")));
@@ -661,11 +616,6 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
.andAnswer(() -> veto.map(ImmutableSet::of).orElse(ImmutableSet.of()));
}
- private IExpectationSetters<TierInfo> expectGetTier(ScheduledTask task, TierInfo tier) {
- return expect(tierManager.getTier(ITaskConfig.build(task.getAssignedTask().getTask())))
- .andReturn(tier);
- }
-
private static void setResource(ScheduledTask task, ResourceType type, Double value) {
task.getAssignedTask().setTask(ResourceTestUtil.resetResource(
ITaskConfig.build(task.getAssignedTask().getTask()),
@@ -686,17 +636,18 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
.setTask(new TaskConfig()
.setJob(new JobKey(role, env, job))
.setPriority(priority)
+ .setTier(production ? TaskTestUtil.PROD_TIER_NAME : null)
.setProduction(production)
.setConstraints(Sets.newHashSet())
.setExecutorConfig(new ExecutorConfig(apiConstants.AURORA_EXECUTOR_NAME, "config")));
return new ScheduledTask().setAssignedTask(assignedTask);
}
- static ScheduledTask makeTask(String role, String job, String taskId) {
+ private static ScheduledTask makeTask(String role, String job, String taskId) {
return makeTask(role, job, taskId, 0, "dev", false);
}
- static void addEvent(ScheduledTask task, ScheduleStatus status) {
+ private static void addEvent(ScheduledTask task, ScheduleStatus status) {
task.addToTaskEvents(new TaskEvent(0, status));
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
index 5e2fdcb..e6b2b74 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
@@ -16,15 +16,14 @@ package org.apache.aurora.scheduler.scheduling;
import com.google.common.collect.ImmutableList;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.offers.HostOffer;
-import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -32,10 +31,8 @@ import static org.junit.Assert.assertFalse;
public class FirstFitOfferSelectorTest extends EasyMockTest {
private static final IAssignedTask TASK = makeTask("id", JOB).getAssignedTask();
- private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
- TASK.getTask(),
- ResourceBag.EMPTY,
- empty());
+ private static final ResourceRequest EMPTY_REQUEST =
+ TaskTestUtil.toResourceRequest(TASK.getTask());
private OfferSelector firstFitOfferSelector;
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
index 78e1269..f84941e 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
@@ -26,28 +26,22 @@ import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.InstanceKeys;
import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.FrameworkID;
-import org.apache.mesos.v1.Protos.OfferID;
import org.apache.mesos.v1.Protos.TaskID;
import org.apache.mesos.v1.Protos.TaskInfo;
import org.junit.Before;
@@ -55,10 +49,10 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
@@ -88,27 +82,22 @@ public class TaskAssignerImplTest extends EasyMockTest {
.setHost(MESOS_OFFER.getHostname())
.setAttributes(ImmutableSet.of(
new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
- private static final IScheduledTask TASK = makeTask("id", JOB);
- private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
+ private static final IAssignedTask TASK = makeTask("id", JOB).getAssignedTask();
+ private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getTask());
private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
.setName("taskName")
- .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
+ .setTaskId(TaskID.newBuilder().setValue(TASK.getTaskId()))
.setAgentId(MESOS_OFFER.getAgentId())
.build();
- private static final IInstanceKey INSTANCE_KEY =
- InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
+ private static final IInstanceKey INSTANCE_KEY = InstanceKeys.from(JOB, TASK.getInstanceId());
private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
- private static final HostOffer OFFER_2 = new HostOffer(
- Offer.newBuilder()
- .setId(OfferID.newBuilder().setValue("offerId0"))
- .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
- .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
- .setHostname("hostName0")
- .addResources(mesosRange(PORTS, PORT))
- .addResources(mesosScalar(CPUS, 1))
- .addResources(mesosScalar(RAM_MB, 1024))
- .build(),
- IHostAttributes.build(new HostAttributes()));
+ private static final Offer MESOS_OFFER_2 =
+ offer("offer-2", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT));
+ private static final HostOffer OFFER_2 =
+ new HostOffer(MESOS_OFFER_2, IHostAttributes.build(new HostAttributes()
+ .setHost(MESOS_OFFER_2.getHostname())
+ .setAttributes(ImmutableSet.of(
+ new Attribute("host", ImmutableSet.of(MESOS_OFFER_2.getHostname()))))));
private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
@@ -120,17 +109,15 @@ public class TaskAssignerImplTest extends EasyMockTest {
private MesosTaskFactory taskFactory;
private OfferManager offerManager;
private TaskAssignerImpl assigner;
- private TierManager tierManager;
private FakeStatsProvider statsProvider;
private UpdateAgentReserver updateAgentReserver;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
storeProvider = createMock(MutableStoreProvider.class);
taskFactory = createMock(MesosTaskFactory.class);
stateManager = createMock(StateManager.class);
offerManager = createMock(OfferManager.class);
- tierManager = createMock(TierManager.class);
updateAgentReserver = createMock(UpdateAgentReserver.class);
statsProvider = new FakeStatsProvider();
// TODO(jly): FirstFitOfferSelector returns the first offer which is what we want for testing,
@@ -140,43 +127,49 @@ public class TaskAssignerImplTest extends EasyMockTest {
stateManager,
taskFactory,
offerManager,
- tierManager,
updateAgentReserver,
statsProvider,
offerSelector);
aggregate = empty();
- resourceRequest = new ResourceRequest(
- TASK.getAssignedTask().getTask(),
- ResourceBag.EMPTY,
- aggregate);
+ resourceRequest = ResourceRequest.fromTask(
+ TASK.getTask(),
+ NO_OVERHEAD_EXECUTOR,
+ aggregate,
+ TaskTestUtil.TIER_MANAGER);
}
@Test
- public void testAssignNoTasks() throws Exception {
+ public void testAssignNoTasks() {
control.replay();
assertEquals(
NO_ASSIGNMENT,
- assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ GROUP_KEY,
+ ImmutableSet.of(),
+ NO_RESERVATION));
}
@Test
public void testAssignmentClearedOnError() throws Exception {
- expectNoUpdateReservations(1);
- expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
- .andReturn(ImmutableSet.of(OFFER, OFFER_2));
+ expect(updateAgentReserver.isReserved(anyString())).andReturn(false).atLeastOnce();
+ expect(updateAgentReserver.getAgent(anyObject())).andReturn(Optional.empty()).atLeastOnce();
+
+ expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest))
+ .andReturn(ImmutableSet.of(OFFER, OFFER_2)).atLeastOnce();
offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
expectAssignTask(MESOS_OFFER);
expect(stateManager.changeState(
storeProvider,
- Tasks.id(TASK),
+ TASK.getTaskId(),
Optional.of(PENDING),
LOST,
LAUNCH_FAILED_MSG))
.andReturn(StateChangeResult.SUCCESS);
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+ expect(taskFactory.createFrom(TASK, MESOS_OFFER, false))
.andReturn(TASK_INFO);
control.replay();
@@ -188,9 +181,9 @@ public class TaskAssignerImplTest extends EasyMockTest {
assigner.maybeAssign(
storeProvider,
resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ TaskGroupKey.from(TASK.getTask()),
ImmutableSet.of(
- TASK.getAssignedTask(),
+ TASK,
makeTask("id2", JOB).getAssignedTask(),
makeTask("id3", JOB).getAssignedTask()),
NO_RESERVATION));
@@ -198,10 +191,9 @@ public class TaskAssignerImplTest extends EasyMockTest {
}
@Test
- public void testAssignmentSkippedForReservedSlave() throws Exception {
+ public void testAssignmentSkippedForReservedSlave() {
expectNoUpdateReservations(0);
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+ expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest))
.andReturn(ImmutableSet.of(OFFER));
control.replay();
@@ -211,8 +203,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
assigner.maybeAssign(
storeProvider,
resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
+ TaskGroupKey.from(TASK.getTask()),
+ ImmutableSet.of(TASK),
ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
}
@@ -223,73 +215,68 @@ public class TaskAssignerImplTest extends EasyMockTest {
// and permissive in task->slave direction. In other words, a task with a slave reservation
// should still be tried against other unreserved slaves.
expectNoUpdateReservations(1);
- expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+ expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest))
.andReturn(ImmutableSet.of(OFFER_2, OFFER));
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
expectAssignTask(OFFER_2.getOffer());
- expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), false))
+ expect(taskFactory.createFrom(TASK, OFFER_2.getOffer(), false))
.andReturn(TASK_INFO);
offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
control.replay();
assertEquals(
- ImmutableSet.of(Tasks.id(TASK)),
+ ImmutableSet.of(TASK.getTaskId()),
assigner.maybeAssign(
storeProvider,
resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
+ TaskGroupKey.from(TASK.getTask()),
+ ImmutableSet.of(TASK),
ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
}
@Test
public void testResourceMapperCallback() {
- AssignedTask builder = TASK.newBuilder().getAssignedTask();
+ AssignedTask builder = TASK.newBuilder();
builder.unsetAssignedPorts();
control.replay();
assertEquals(
- TASK.getAssignedTask(),
+ TASK,
assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
}
@Test
public void testAssignToReservedAgent() throws Exception {
- expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
- expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+ expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest))
.andReturn(Optional.of(OFFER));
expectAssignTask(MESOS_OFFER);
offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+ expect(taskFactory.createFrom(TASK, MESOS_OFFER, false))
.andReturn(TASK_INFO);
control.replay();
assertEquals(
- ImmutableSet.of(Tasks.id(TASK)),
+ ImmutableSet.of(TASK.getTaskId()),
assigner.maybeAssign(
storeProvider,
resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ TaskGroupKey.from(TASK.getTask()),
ImmutableSet.of(
- TASK.getAssignedTask()),
+ TASK),
ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
assertNotEquals(empty(), aggregate);
}
@Test
- public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
- expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+ public void testAssignReservedAgentWhenOfferNotReady() {
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
- expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+ expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest))
.andReturn(Optional.empty());
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
expectLastCall();
control.replay();
@@ -299,55 +286,48 @@ public class TaskAssignerImplTest extends EasyMockTest {
assigner.maybeAssign(
storeProvider,
resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
+ TaskGroupKey.from(TASK.getTask()),
+ ImmutableSet.of(TASK),
ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
assertEquals(empty(), aggregate);
}
@Test
public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
- expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
- expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+ expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest))
.andReturn(Optional.of(OFFER));
expectAssignTask(MESOS_OFFER);
offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+ expect(taskFactory.createFrom(TASK, MESOS_OFFER, false))
.andReturn(TASK_INFO);
- // Normal scheduling loop for the remaining task...
- IScheduledTask secondTask = makeTask("another-task", JOB, 9999);
+ // Normal scheduling loop for the remaining task.
+ IAssignedTask secondTask = makeTask("another-task", JOB, 9999).getAssignedTask();
TaskInfo secondTaskInfo = TaskInfo.newBuilder()
.setName("another-task")
- .setTaskId(TaskID.newBuilder().setValue(Tasks.id(secondTask)))
+ .setTaskId(TaskID.newBuilder().setValue(secondTask.getTaskId()))
.setAgentId(MESOS_OFFER.getAgentId())
.build();
expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.empty());
- ImmutableSet<HostOffer> matchingOffers = ImmutableSet.of(OFFER);
- expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
- .andReturn(matchingOffers);
- expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
- .andReturn(ImmutableSet.of());
- expectAssignTask(MESOS_OFFER, secondTask);
- offerManager.launchTask(MESOS_OFFER.getId(), secondTaskInfo);
- expect(taskFactory.createFrom(secondTask.getAssignedTask(), MESOS_OFFER, false))
- .andReturn(secondTaskInfo);
+ expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest))
+ .andReturn(ImmutableSet.of(OFFER_2));
+ expect(updateAgentReserver.isReserved(OFFER_2.getOffer().getAgentId().getValue()))
+ .andReturn(false);
+ expectAssignTask(MESOS_OFFER_2, secondTask);
+ offerManager.launchTask(MESOS_OFFER_2.getId(), secondTaskInfo);
+ expect(taskFactory.createFrom(secondTask, MESOS_OFFER_2, false)).andReturn(secondTaskInfo);
control.replay();
assertEquals(
- Tasks.ids(TASK, secondTask),
+ ImmutableSet.of(TASK.getTaskId(), secondTask.getTaskId()),
assigner.maybeAssign(
storeProvider,
resourceRequest,
GROUP_KEY,
- ImmutableSet.of(
- TASK.getAssignedTask(),
- secondTask.getAssignedTask()),
+ ImmutableSet.of(TASK, secondTask),
ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
assertNotEquals(empty(), aggregate);
}
@@ -356,19 +336,19 @@ public class TaskAssignerImplTest extends EasyMockTest {
expectAssignTask(offer, TASK);
}
- private void expectAssignTask(Offer offer, IScheduledTask task) {
+ private void expectAssignTask(Offer offer, IAssignedTask task) {
expect(stateManager.assignTask(
eq(storeProvider),
- eq(Tasks.id(task)),
+ eq(task.getTaskId()),
eq(offer.getHostname()),
eq(offer.getAgentId()),
- anyObject())).andReturn(task.getAssignedTask());
+ anyObject())).andReturn(task);
}
private void expectNoUpdateReservations(int offers) {
- expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
- for (int i = 0; i < offers; i++) {
- expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
+ if (offers > 0) {
+ expect(updateAgentReserver.isReserved(anyString())).andReturn(false).times(offers);
}
+ expect(updateAgentReserver.getAgent(anyObject())).andReturn(Optional.empty());
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index ecf2987..ac2df94 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -31,6 +31,7 @@ import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
@@ -44,8 +45,6 @@ import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.state.PubsubTestUtil;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -62,6 +61,7 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR;
import static org.easymock.EasyMock.eq;
@@ -105,7 +105,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
new AbstractModule() {
@Override
protected void configure() {
-
+ bind(TierManager.class).toInstance(TIER_MANAGER);
bind(Executor.class).annotatedWith(AsyncExecutor.class)
.toInstance(MoreExecutors.directExecutor());
bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
@@ -127,28 +127,24 @@ public class TaskSchedulerImplTest extends EasyMockTest {
ImmutableSet.of(task));
}
- private ResourceBag bag(IScheduledTask task) {
- return ResourceManager.bagFromResources(task.getAssignedTask().getTask().getResources())
- .add(THERMOS_EXECUTOR.getExecutorOverhead(task.getAssignedTask()
- .getTask()
- .getExecutorConfig()
- .getName()).get());
- }
-
private IExpectationSetters<Set<String>> expectAssigned(
IScheduledTask task,
Map<String, TaskGroupKey> reservationMap) {
return expect(assigner.maybeAssign(
storageUtil.mutableStoreProvider,
- new ResourceRequest(task.getAssignedTask().getTask(), bag(task), empty()),
+ ResourceRequest.fromTask(
+ task.getAssignedTask().getTask(),
+ THERMOS_EXECUTOR,
+ empty(),
+ TIER_MANAGER),
TaskGroupKey.from(task.getAssignedTask().getTask()),
ImmutableSet.of(task.getAssignedTask()),
reservationMap));
}
@Test
- public void testSchedule() throws Exception {
+ public void testSchedule() {
storageUtil.expectOperations();
expectAsMap(NO_RESERVATION);
@@ -164,7 +160,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
@Test
- public void testScheduleNoTask() throws Exception {
+ public void testScheduleNoTask() {
storageUtil.expectOperations();
storageUtil.expectTaskFetch(
Query.taskScoped(Tasks.id(TASK_A)).byStatus(PENDING),
@@ -178,7 +174,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
@Test
- public void testSchedulePartial() throws Exception {
+ public void testSchedulePartial() {
storageUtil.expectOperations();
String taskB = "b";
@@ -214,7 +210,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
@Test
- public void testReservation() throws Exception {
+ public void testReservation() {
storageUtil.expectOperations();
// No reservation available in preemptor
@@ -254,7 +250,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
@Test
- public void testReservationUnusable() throws Exception {
+ public void testReservationUnusable() {
storageUtil.expectOperations();
expectTaskStillPendingQuery(TASK_A);
@@ -271,7 +267,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
@Test
- public void testReservationRemoved() throws Exception {
+ public void testReservationRemoved() {
storageUtil.expectOperations();
expectTaskStillPendingQuery(TASK_A);
@@ -288,14 +284,14 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
@Test
- public void testNonPendingIgnored() throws Exception {
+ public void testNonPendingIgnored() {
control.replay();
eventSink.post(TaskStateChange.transition(TASK_A, RUNNING));
}
@Test
- public void testPendingDeletedHandled() throws Exception {
+ public void testPendingDeletedHandled() {
reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
control.replay();
@@ -325,7 +321,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expectAsMap(NO_RESERVATION);
expect(assigner.maybeAssign(
EasyMock.anyObject(),
- eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), empty())),
+ eq(ResourceRequest.fromTask(
+ taskA.getAssignedTask().getTask(),
+ THERMOS_EXECUTOR,
+ empty(),
+ TIER_MANAGER)),
eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())),
eq(ImmutableSet.of(taskA.getAssignedTask())),
eq(NO_RESERVATION))).andReturn(SCHEDULED_RESULT);
@@ -337,7 +337,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
}
@Test
- public void testScheduleThrows() throws Exception {
+ public void testScheduleThrows() {
storageUtil.expectOperations();
expectAsMap(NO_RESERVATION);
@@ -370,17 +370,17 @@ public class TaskSchedulerImplTest extends EasyMockTest {
reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask()));
}
- private IExpectationSetters<?> expectGetReservation(IScheduledTask task, String slaveId) {
- return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+ private void expectGetReservation(IScheduledTask task, String slaveId) {
+ expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
.andReturn(ImmutableSet.of(slaveId));
}
- private IExpectationSetters<?> expectNoReservation(IScheduledTask task) {
- return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+ private void expectNoReservation(IScheduledTask task) {
+ expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
.andReturn(ImmutableSet.of());
}
- private IExpectationSetters<?> expectAsMap(Map<String, TaskGroupKey> map) {
- return expect(reservations.asMap()).andReturn(map);
+ private void expectAsMap(Map<String, TaskGroupKey> map) {
+ expect(reservations.asMap()).andReturn(map);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java b/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
index d8563b8..6eebdb5 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
@@ -21,7 +21,6 @@ import org.apache.aurora.scheduler.updater.UpdateAgentReserver.NullAgentReserver
import org.junit.Test;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
public class NullAgentReserverTest extends EasyMockTest {
private static final IInstanceKey INSTANCE_KEY =
@@ -33,6 +32,6 @@ public class NullAgentReserverTest extends EasyMockTest {
NullAgentReserver reserver = new NullAgentReserver();
reserver.reserve("test", INSTANCE_KEY);
assertFalse(reserver.getAgent(INSTANCE_KEY).isPresent());
- assertTrue(reserver.getReservations("test").isEmpty());
+ assertFalse(reserver.isReserved("test"));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java b/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
index 7f17be0..051ac0e 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
@@ -13,25 +13,19 @@
*/
package org.apache.aurora.scheduler.updater;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.Resource;
-import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.InstanceKeys;
import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.updater.UpdateAgentReserver.UpdateAgentReserverImpl;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -43,16 +37,6 @@ public class UpdateAgentReserverImplTest extends EasyMockTest {
private static final IInstanceKey INSTANCE_KEY =
InstanceKeys.from(JobKeys.from("role", "env", "name"), 1);
- private TaskGroupKey getTaskGroup(IInstanceKey key) {
- return TaskGroupKey.from(ITaskConfig.build(
- new TaskConfig()
- .setJob(key.getJobKey().newBuilder())
- .setResources(ImmutableSet.of(
- Resource.numCpus(1.0),
- Resource.ramMb(1L),
- Resource.diskMb(1L)))));
- }
-
@Before
public void setUp() {
cache = createMock(new Clazz<BiCache<IInstanceKey, String>>() { });
@@ -76,32 +60,11 @@ public class UpdateAgentReserverImplTest extends EasyMockTest {
}
@Test
- public void testGetReservations() {
+ public void testIsReserved() {
expect(cache.getByValue(AGENT_ID)).andReturn(ImmutableSet.of(INSTANCE_KEY));
+ expect(cache.getByValue(AGENT_ID)).andReturn(ImmutableSet.of());
control.replay();
- assertEquals(ImmutableSet.of(INSTANCE_KEY), reserver.getReservations(AGENT_ID));
+ assertTrue(reserver.isReserved(AGENT_ID));
+ assertFalse(reserver.isReserved(AGENT_ID));
}
-
- @Test
- public void testHasReservations() {
- IInstanceKey instanceKey2 = InstanceKeys.from(JobKeys.from("role", "env", "name"), 2);
- IInstanceKey instanceKey3 = InstanceKeys.from(JobKeys.from("role2", "env2", "name2"), 1);
- expect(cache.asMap())
- .andReturn(ImmutableMap.of(
- INSTANCE_KEY,
- AGENT_ID,
- instanceKey2,
- AGENT_ID,
- instanceKey3,
- "different-agent")).anyTimes();
- control.replay();
- assertTrue(reserver.hasReservations(getTaskGroup(INSTANCE_KEY)));
- assertTrue(reserver.hasReservations(getTaskGroup(instanceKey2)));
- assertTrue(reserver.hasReservations(getTaskGroup(instanceKey3)));
- assertTrue(reserver.hasReservations(
- getTaskGroup(InstanceKeys.from(JobKeys.from("role", "env", "name"), 3))));
- assertFalse(reserver.hasReservations(
- getTaskGroup(InstanceKeys.from(JobKeys.from("not", "in", "map"), 1))));
- }
-
}
[2/2] aurora git commit: Refactor scheduling code to split matching
and assigning phases
Posted by wf...@apache.org.
Refactor scheduling code to split matching and assigning phases
This patch sets the stage for performing the bulk of scheduling work in a
separate call path, without holding the write lock.
Also included is a mechanical refactor pushing the `revocable` flag into
`ResourceRequest` (which was ~always needed as a sibling parameter).
Reviewed at https://reviews.apache.org/r/64954/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4e6242fe
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4e6242fe
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4e6242fe
Branch: refs/heads/master
Commit: 4e6242fed68650e7be906ec3d17ae750f49f8cb8
Parents: 5b34231
Author: Bill Farner <wf...@apache.org>
Authored: Tue Jan 9 14:50:51 2018 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Jan 9 14:50:51 2018 -0800
----------------------------------------------------------------------
.../common/testing/easymock/EasyMockTest.java | 14 +-
.../benchmark/fakes/FakeOfferManager.java | 12 +-
.../apache/aurora/scheduler/TierManager.java | 4 +-
.../aurora/scheduler/base/TaskTestUtil.java | 14 +-
.../executor/ExecutorSettings.java | 21 +-
.../scheduler/filter/SchedulingFilter.java | 42 +++-
.../scheduler/mesos/MesosTaskFactory.java | 7 +-
.../aurora/scheduler/offers/HostOffers.java | 29 +--
.../aurora/scheduler/offers/OfferManager.java | 10 +-
.../scheduler/offers/OfferManagerImpl.java | 12 +-
.../scheduler/preemptor/PreemptionVictim.java | 5 +-
.../preemptor/PreemptionVictimFilter.java | 20 +-
.../scheduler/resources/ResourceManager.java | 6 +
.../scheduler/scheduling/TaskAssigner.java | 2 +-
.../scheduler/scheduling/TaskAssignerImpl.java | 237 +++++++++----------
.../scheduler/scheduling/TaskScheduler.java | 2 +-
.../scheduler/scheduling/TaskSchedulerImpl.java | 100 ++++----
.../aurora/scheduler/storage/TaskStore.java | 3 +-
.../storage/durability/WriteRecorder.java | 3 +-
.../scheduler/updater/UpdateAgentReserver.java | 33 +--
.../aurora/scheduler/TierManagerTest.java | 7 +
.../events/NotifyingSchedulingFilterTest.java | 6 +-
.../filter/SchedulingFilterImplTest.java | 36 ++-
.../mesos/MesosTaskFactoryImplTest.java | 12 +-
.../scheduler/offers/OfferManagerImplTest.java | 51 ++--
.../preemptor/PreemptionVictimFilterTest.java | 101 ++------
.../scheduling/FirstFitOfferSelectorTest.java | 9 +-
.../scheduling/TaskAssignerImplTest.java | 170 ++++++-------
.../scheduling/TaskSchedulerImplTest.java | 56 ++---
.../updater/NullAgentReserverTest.java | 3 +-
.../updater/UpdateAgentReserverImplTest.java | 45 +---
31 files changed, 476 insertions(+), 596 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
index c5500b3..15fc677 100644
--- a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
+++ b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
@@ -26,6 +26,9 @@ import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
import static org.easymock.EasyMock.createControl;
@@ -38,6 +41,16 @@ import static org.easymock.EasyMock.createControl;
public abstract class EasyMockTest extends TearDownTestCase {
protected IMocksControl control;
+ @Rule
+ public final TestWatcher verifyControl = new TestWatcher() {
+ @Override
+ protected void succeeded(Description description) {
+ // Only attempt to verify the control when the test case otherwise succeeded. This prevents
+ // spurious mock-related error messages that distract from the real error.
+ control.verify();
+ }
+ };
+
/**
* Creates an EasyMock {@link #control} for tests to use that will be automatically
* {@link IMocksControl#verify() verified} on tear down.
@@ -45,7 +58,6 @@ public abstract class EasyMockTest extends TearDownTestCase {
@Before
public final void setupEasyMock() {
control = createControl();
- addTearDown(() -> control.verify());
}
/**
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index f0dacd4..0a105c7 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -15,6 +15,8 @@ package org.apache.aurora.benchmark.fakes;
import java.util.Optional;
+import com.google.common.collect.ImmutableList;
+
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -59,18 +61,14 @@ public class FakeOfferManager implements OfferManager {
}
@Override
- public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
- ResourceRequest resourceRequest,
- boolean revocable) {
-
+ public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) {
return Optional.empty();
}
@Override
public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
- ResourceRequest resourceRequest,
- boolean revocable) {
+ ResourceRequest resourceRequest) {
- return null;
+ return ImmutableList.of();
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/TierManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TierManager.java b/src/main/java/org/apache/aurora/scheduler/TierManager.java
index c6ad2b1..a37fea4 100644
--- a/src/main/java/org/apache/aurora/scheduler/TierManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/TierManager.java
@@ -14,6 +14,7 @@
package org.apache.aurora.scheduler;
import java.util.Map;
+import java.util.Optional;
import javax.inject.Inject;
@@ -102,7 +103,8 @@ public interface TierManager {
!taskConfig.isSetTier() || tierConfig.tiers.containsKey(taskConfig.getTier()),
"Invalid tier '%s' in TaskConfig.", taskConfig.getTier());
- return tierConfig.tiers.get(taskConfig.getTier());
+ return tierConfig.tiers.get(
+ Optional.ofNullable(taskConfig.getTier()).orElse(tierConfig.defaultTier));
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 22d5a64..2b61c27 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -47,6 +47,8 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -70,12 +72,14 @@ public final class TaskTestUtil {
new TierInfo(true /* preemptible */, false /* revocable */);
public static final TierInfo PREFERRED_TIER =
new TierInfo(false /* preemptible */, false /* revocable */);
+ public static final String REVOCABLE_TIER_NAME = "tier-revocable";
public static final String PROD_TIER_NAME = "tier-prod";
public static final String DEV_TIER_NAME = "tier-dev";
public static final TierConfig TIER_CONFIG =
new TierConfig(DEV_TIER_NAME, ImmutableMap.of(
PROD_TIER_NAME, PREFERRED_TIER,
- DEV_TIER_NAME, DEV_TIER
+ DEV_TIER_NAME, DEV_TIER,
+ REVOCABLE_TIER_NAME, REVOCABLE_TIER
));
public static final TierManager TIER_MANAGER = new TierManager.TierManagerImpl(TIER_CONFIG);
public static final ThriftBackfill THRIFT_BACKFILL = new ThriftBackfill(TIER_MANAGER);
@@ -237,4 +241,12 @@ public final class TaskTestUtil {
new org.apache.aurora.gen.TierConfig("revocable", REVOCABLE_TIER.toMap())
);
}
+
+ public static ResourceRequest toResourceRequest(ITaskConfig task) {
+ return ResourceRequest.fromTask(
+ task,
+ EXECUTOR_SETTINGS,
+ AttributeAggregate.empty(),
+ TIER_MANAGER);
+ }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
index 5c987fd..dac84e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
@@ -19,6 +19,9 @@ import java.util.Optional;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
@@ -26,6 +29,9 @@ import static java.util.Objects.requireNonNull;
* Configuration for the executor to run, and resource overhead required for it.
*/
public class ExecutorSettings {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorSettings.class);
+
private final Map<String, ExecutorConfig> config;
private final boolean populateDiscoveryInfo;
@@ -45,12 +51,19 @@ public class ExecutorSettings {
return populateDiscoveryInfo;
}
- public Optional<ResourceBag> getExecutorOverhead(String name) {
+ public ResourceBag getExecutorOverhead(ITaskConfig task) {
+ if (!task.isSetExecutorConfig()) {
+ // Docker-based tasks don't need executors
+ return ResourceBag.EMPTY;
+ }
+
+ String name = task.getExecutorConfig().getName();
if (config.containsKey(name)) {
- return Optional.of(
- ResourceManager.bagFromMesosResources(config.get(name).getExecutor().getResourcesList()));
+ return ResourceManager.bagFromMesosResources(
+ config.get(name).getExecutor().getResourcesList());
} else {
- return Optional.empty();
+ LOG.warn("No executor configuration found for " + name);
+ return ResourceBag.EMPTY;
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index bd41590..fd97259 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -21,12 +21,17 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import static java.util.Objects.requireNonNull;
+
import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.CONSTRAINT_MISMATCH;
import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.INSUFFICIENT_RESOURCES;
import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.LIMIT_NOT_SATISFIED;
@@ -306,11 +311,31 @@ public interface SchedulingFilter {
private final ITaskConfig task;
private final ResourceBag request;
private final AttributeAggregate jobState;
+ private final boolean revocable;
+
+ private ResourceRequest(
+ ITaskConfig task,
+ ResourceBag request,
+ AttributeAggregate jobState,
+ boolean revocable) {
- public ResourceRequest(ITaskConfig task, ResourceBag request, AttributeAggregate jobState) {
- this.task = task;
- this.request = request;
- this.jobState = jobState;
+ this.task = requireNonNull(task);
+ this.request = requireNonNull(request);
+ this.jobState = requireNonNull(jobState);
+ this.revocable = revocable;
+ }
+
+ public static ResourceRequest fromTask(
+ ITaskConfig task,
+ ExecutorSettings executorSettings,
+ AttributeAggregate jobState,
+ TierManager tierManager) {
+
+ return new ResourceRequest(
+ task,
+ ResourceManager.bagFromTask(task, executorSettings),
+ jobState,
+ tierManager.getTier(task).isRevocable());
}
public Iterable<IConstraint> getConstraints() {
@@ -329,6 +354,10 @@ public interface SchedulingFilter {
return jobState;
}
+ public boolean isRevocable() {
+ return revocable;
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ResourceRequest)) {
@@ -338,12 +367,13 @@ public interface SchedulingFilter {
ResourceRequest other = (ResourceRequest) o;
return Objects.equals(task, other.task)
&& Objects.equals(request, other.request)
- && Objects.equals(jobState, other.jobState);
+ && Objects.equals(jobState, other.jobState)
+ && revocable == other.revocable;
}
@Override
public int hashCode() {
- return Objects.hash(task, request, jobState);
+ return Objects.hash(task, request, jobState, revocable);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index cb288bb..bcb2bbf 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -152,12 +152,7 @@ public interface MesosTaskFactory {
ITaskConfig config = task.getTask();
- // Docker-based tasks don't need executors
- ResourceBag executorOverhead = ResourceBag.EMPTY;
- if (config.isSetExecutorConfig()) {
- executorOverhead =
- executorSettings.getExecutorOverhead(getExecutorName(task)).orElse(ResourceBag.EMPTY);
- }
+ ResourceBag executorOverhead = executorSettings.getExecutorOverhead(config);
AcceptedOffer acceptedOffer;
// TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
index a01c0a8..2ea7a01 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
@@ -154,22 +154,13 @@ class HostOffers {
.toSet();
}
- synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId,
- ResourceRequest resourceRequest,
- boolean revocable) {
+ synchronized Optional<HostOffer> getMatching(
+ Protos.AgentID slaveId,
+ ResourceRequest resourceRequest) {
- Optional<HostOffer> optionalOffer = get(slaveId);
- if (optionalOffer.isPresent()) {
- HostOffer offer = optionalOffer.get();
-
- if (isGloballyBanned(offer)
- || isVetoed(offer, resourceRequest, revocable, Optional.empty())) {
-
- return Optional.empty();
- }
- }
-
- return optionalOffer;
+ return get(slaveId)
+ .filter(offer -> !isGloballyBanned(offer))
+ .filter(offer -> !isVetoed(offer, resourceRequest, Optional.empty()));
}
/**
@@ -182,14 +173,13 @@ class HostOffers {
* @return The offers a given task group can use.
*/
synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
- ResourceRequest resourceRequest,
- boolean revocable) {
+ ResourceRequest resourceRequest) {
return Iterables.unmodifiableIterable(FluentIterable.from(offers)
.filter(o -> !isGloballyBanned(o))
.filter(o -> !isStaticallyBanned(o, groupKey))
.filter(HostOffer::hasCpuAndMem)
- .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey))));
+ .filter(o -> !isVetoed(o, resourceRequest, Optional.of(groupKey))));
}
private synchronized boolean isGloballyBanned(HostOffer offer) {
@@ -207,11 +197,10 @@ class HostOffers {
*/
private boolean isVetoed(HostOffer offer,
ResourceRequest resourceRequest,
- boolean revocable,
Optional<TaskGroupKey> groupKey) {
vetoEvaluatedOffers.incrementAndGet();
- UnusedResource unusedResource = new UnusedResource(offer, revocable);
+ UnusedResource unusedResource = new UnusedResource(offer, resourceRequest.isRevocable());
Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest);
if (!vetoes.isEmpty()) {
if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index e90de3e..8f9e33d 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -81,12 +81,9 @@ public interface OfferManager extends EventSubscriber {
*
* @param slaveId Slave ID to get the offer for.
* @param resourceRequest The request that the offer should satisfy.
- * @param revocable Whether or not the request can use revocable resources.
* @return An option containing the offer for the slave ID if it fits.
*/
- Optional<HostOffer> getMatching(AgentID slaveId,
- ResourceRequest resourceRequest,
- boolean revocable);
+ Optional<HostOffer> getMatching(AgentID slaveId, ResourceRequest resourceRequest);
/**
* Gets all offers that the scheduler is holding that satisfy the supplied
@@ -94,12 +91,9 @@ public interface OfferManager extends EventSubscriber {
*
* @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}.
* @param resourceRequest The request that the offer should satisfy.
- * @param revocable Whether or not the request can use revocable resources.
* @return An option containing the offer for the slave ID if it fits.
*/
- Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
- ResourceRequest resourceRequest,
- boolean revocable);
+ Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, ResourceRequest resourceRequest);
/**
* Launches the task matched against the offer.
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
index 8e806b7..084b48c 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
@@ -162,19 +162,15 @@ public class OfferManagerImpl implements OfferManager {
}
@Override
- public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
- ResourceRequest resourceRequest,
- boolean revocable) {
-
- return hostOffers.getMatching(slaveId, resourceRequest, revocable);
+ public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) {
+ return hostOffers.getMatching(slaveId, resourceRequest);
}
@Override
public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
- ResourceRequest resourceRequest,
- boolean revocable) {
+ ResourceRequest resourceRequest) {
- return hostOffers.getAllMatching(groupKey, resourceRequest, revocable);
+ return hostOffers.getAllMatching(groupKey, resourceRequest);
}
/**
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
index 69b6866..780689e 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
@@ -17,6 +17,7 @@ import java.util.Objects;
import com.google.common.base.MoreObjects;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -54,8 +55,8 @@ public final class PreemptionVictim {
return task.getTask().getPriority();
}
- public ResourceBag getResourceBag() {
- return ResourceManager.bagFromResources(task.getTask().getResources());
+ public ResourceBag getResourceBag(ExecutorSettings executorSettings) {
+ return ResourceManager.bagFromTask(task.getTask(), executorSettings);
}
public String getTaskId() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index cf6d348..569cfe6 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -113,13 +112,7 @@ public interface PreemptionVictimFilter {
new Function<PreemptionVictim, ResourceBag>() {
@Override
public ResourceBag apply(PreemptionVictim victim) {
- ResourceBag bag = victim.getResourceBag();
-
- if (victim.getConfig().isSetExecutorConfig()) {
- // Be pessimistic about revocable resource available if config is not available
- bag.add(executorSettings.getExecutorOverhead(
- victim.getConfig().getExecutorConfig().getName()).orElse(EMPTY));
- }
+ ResourceBag bag = victim.getResourceBag(executorSettings);
if (tierManager.getTier(victim.getConfig()).isRevocable()) {
// Revocable task CPU cannot be used for preemption purposes as it's a compressible
@@ -223,10 +216,8 @@ public interface PreemptionVictimFilter {
return Optional.empty();
}
- ResourceBag overhead = pendingTask.isSetExecutorConfig()
- ? executorSettings.getExecutorOverhead(
- pendingTask.getExecutorConfig().getName()).orElse(EMPTY)
- : EMPTY;
+ ResourceRequest requiredResources =
+ ResourceRequest.fromTask(pendingTask, executorSettings, jobState, tierManager);
ResourceBag totalResource = slackResources;
for (PreemptionVictim victim : sortedVictims) {
@@ -240,10 +231,7 @@ public interface PreemptionVictimFilter {
Set<Veto> vetoes = schedulingFilter.filter(
new UnusedResource(totalResource, attributes.get(), unavailability),
- new ResourceRequest(
- pendingTask,
- ResourceManager.bagFromResources(pendingTask.getResources()).add(overhead),
- jobState));
+ requiredResources);
if (vetoes.isEmpty()) {
return Optional.of(ImmutableSet.copyOf(toPreemptTasks));
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index d093753..2bf9808 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -26,6 +26,7 @@ import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IResource;
@@ -245,6 +246,11 @@ public final class ResourceManager {
return bagFromResources(resources, RESOURCE_TO_TYPE, QUANTIFY_RESOURCE);
}
+ public static ResourceBag bagFromTask(ITaskConfig task, ExecutorSettings executorSettings) {
+ return bagFromResources(task.getResources(), RESOURCE_TO_TYPE, QUANTIFY_RESOURCE)
+ .add(executorSettings.getExecutorOverhead(task));
+ }
+
/**
* Creates a {@link ResourceBag} from Mesos resources.
*
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
index 87619b5..d2597a1 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
@@ -41,6 +41,6 @@ public interface TaskAssigner {
MutableStoreProvider storeProvider,
ResourceRequest resourceRequest,
TaskGroupKey groupKey,
- Iterable<IAssignedTask> tasks,
+ Set<IAssignedTask> tasks,
Map<String, TaskGroupKey> preemptionReservations);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
index 54bd177..ec416cc 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
@@ -13,6 +13,7 @@
*/
package org.apache.aurora.scheduler.scheduling;
+import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -21,22 +22,22 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.InstanceKeys;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.offers.HostOffer;
import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManager.LaunchException;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
@@ -64,7 +65,6 @@ public class TaskAssignerImpl implements TaskAssigner {
private final StateManager stateManager;
private final MesosTaskFactory taskFactory;
private final OfferManager offerManager;
- private final TierManager tierManager;
private final UpdateAgentReserver updateAgentReserver;
private final OfferSelector offerSelector;
@@ -73,7 +73,6 @@ public class TaskAssignerImpl implements TaskAssigner {
StateManager stateManager,
MesosTaskFactory taskFactory,
OfferManager offerManager,
- TierManager tierManager,
UpdateAgentReserver updateAgentReserver,
StatsProvider statsProvider,
OfferSelector offerSelector) {
@@ -81,7 +80,6 @@ public class TaskAssignerImpl implements TaskAssigner {
this.stateManager = requireNonNull(stateManager);
this.taskFactory = requireNonNull(taskFactory);
this.offerManager = requireNonNull(offerManager);
- this.tierManager = requireNonNull(tierManager);
this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
this.updateAgentReserver = requireNonNull(updateAgentReserver);
this.offerSelector = requireNonNull(offerSelector);
@@ -99,7 +97,7 @@ public class TaskAssignerImpl implements TaskAssigner {
}
private Protos.TaskInfo assign(
- Storage.MutableStoreProvider storeProvider,
+ MutableStoreProvider storeProvider,
Protos.Offer offer,
String taskId,
boolean revocable) {
@@ -118,20 +116,18 @@ public class TaskAssignerImpl implements TaskAssigner {
}
private void launchUsingOffer(
- Storage.MutableStoreProvider storeProvider,
- boolean revocable,
+ MutableStoreProvider stores,
ResourceRequest resourceRequest,
IAssignedTask task,
- HostOffer offer,
- ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
+ HostOffer offer) throws LaunchException {
String taskId = task.getTaskId();
- Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, revocable);
+ Protos.TaskInfo taskInfo =
+ assign(stores, offer.getOffer(), taskId, resourceRequest.isRevocable());
resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
try {
offerManager.launchTask(offer.getOffer().getId(), taskInfo);
- assignmentResult.add(taskId);
- } catch (OfferManager.LaunchException e) {
+ } catch (LaunchException e) {
LOG.warn("Failed to launch task.", e);
launchFailures.incrementAndGet();
@@ -139,147 +135,144 @@ public class TaskAssignerImpl implements TaskAssigner {
// It is in the LOST state and a new task will move to PENDING to replace it.
// Should the state change fail due to storage issues, that's okay. The task will
// time out in the ASSIGNED state and be moved to LOST.
- stateManager.changeState(
- storeProvider,
- taskId,
- Optional.of(PENDING),
- LOST,
- LAUNCH_FAILED_MSG);
+ stateManager.changeState(stores, taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
throw e;
}
}
- private Iterable<IAssignedTask> maybeAssignReserved(
- Iterable<IAssignedTask> tasks,
- Storage.MutableStoreProvider storeProvider,
- boolean revocable,
- ResourceRequest resourceRequest,
- TaskGroupKey groupKey,
- ImmutableSet.Builder<String> assignmentResult) {
+ private static final class ReservationStatus {
+ final boolean taskReserving;
+ final Optional<HostOffer> offer;
- if (!updateAgentReserver.hasReservations(groupKey)) {
- return tasks;
+ private ReservationStatus(boolean taskReserving, Optional<HostOffer> offer) {
+ this.taskReserving = taskReserving;
+ this.offer = requireNonNull(offer);
}
- // Data structure to record which tasks should be excluded from the regular (non-reserved)
- // scheduling loop. This is important because we release reservations once they are used,
- // so we need to record them separately to avoid them being double-scheduled.
- ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
-
- for (IAssignedTask task : tasks) {
- IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
- Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
- if (maybeAgentId.isPresent()) {
- excludeBuilder.add(key);
- Optional<HostOffer> offer = offerManager.getMatching(
- Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(),
- resourceRequest,
- revocable);
- if (offer.isPresent()) {
- try {
- // The offer can still be veto'd because of changed constraints, or because the
- // Scheduler hasn't been updated by Mesos yet...
- launchUsingOffer(storeProvider,
- revocable,
- resourceRequest,
- task,
- offer.get(),
- assignmentResult);
- LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
- updateAgentReserver.release(maybeAgentId.get(), key);
- } catch (OfferManager.LaunchException e) {
- updateAgentReserver.release(maybeAgentId.get(), key);
- }
- } else {
- LOG.info(
- "Tried to reuse offer on {} for {}, but was not ready yet.",
- maybeAgentId.get(),
- key);
- }
- }
+ static final ReservationStatus NOT_RESERVING = new ReservationStatus(false, Optional.empty());
+ static final ReservationStatus NOT_READY = new ReservationStatus(true, Optional.empty());
+
+ static ReservationStatus ready(HostOffer offer) {
+ return new ReservationStatus(true, Optional.of(offer));
}
- // Return only the tasks that didn't have reservations. Offers on agents that were reserved
- // might not have been seen by Aurora yet, so we need to wait until the reservation expires
- // before giving up and falling back to the first-fit algorithm.
- Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
- return Iterables.filter(tasks, t -> !toBeExcluded.contains(
- InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
+ boolean isTaskReserving() {
+ return taskReserving;
+ }
+
+ Optional<HostOffer> getOffer() {
+ return offer;
+ }
+ }
+
+ private ReservationStatus getReservation(IAssignedTask task, ResourceRequest resourceRequest) {
+
+ IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
+ Optional<String> agentId = updateAgentReserver.getAgent(key);
+ if (!agentId.isPresent()) {
+ return ReservationStatus.NOT_RESERVING;
+ }
+ Optional<HostOffer> offer = offerManager.getMatching(
+ Protos.AgentID.newBuilder().setValue(agentId.get()).build(),
+ resourceRequest);
+ if (offer.isPresent()) {
+ LOG.info("Used update reservation for {} on {}", key, agentId.get());
+ updateAgentReserver.release(agentId.get(), key);
+ return ReservationStatus.ready(offer.get());
+ } else {
+ LOG.info(
+ "Tried to reuse offer on {} for {}, but was not ready yet.",
+ agentId.get(),
+ key);
+ return ReservationStatus.NOT_READY;
+ }
}
/**
* Determine whether or not the offer is reserved for a different task via preemption or
* update affinity.
*/
- @SuppressWarnings("PMD.UselessParentheses") // TODO(jly): PMD bug, remove when upgrade from 5.5.3
- private boolean isAgentReserved(HostOffer offer,
- TaskGroupKey groupKey,
- Map<String, TaskGroupKey> preemptionReservations) {
+ private boolean isAgentReserved(
+ HostOffer offer,
+ TaskGroupKey groupKey,
+ Map<String, TaskGroupKey> preemptionReservations) {
String agentId = offer.getOffer().getAgentId().getValue();
- Optional<TaskGroupKey> reservedGroup = Optional.ofNullable(
- preemptionReservations.get(agentId));
+ boolean reservedForPreemption = Optional.ofNullable(preemptionReservations.get(agentId))
+ .map(group -> !group.equals(groupKey))
+ .orElse(false);
- return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey))
- || !updateAgentReserver.getReservations(agentId).isEmpty();
+ return reservedForPreemption || updateAgentReserver.isReserved(agentId);
}
- @Timed("assigner_maybe_assign")
- @Override
- public Set<String> maybeAssign(
- Storage.MutableStoreProvider storeProvider,
- ResourceRequest resourceRequest,
- TaskGroupKey groupKey,
- Iterable<IAssignedTask> tasks,
- Map<String, TaskGroupKey> preemptionReservations) {
+ private static class SchedulingMatch {
+ final IAssignedTask task;
+ final HostOffer offer;
- if (Iterables.isEmpty(tasks)) {
- return ImmutableSet.of();
+ SchedulingMatch(IAssignedTask task, HostOffer offer) {
+ this.task = requireNonNull(task);
+ this.offer = requireNonNull(offer);
}
+ }
- boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
- ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+ private Collection<SchedulingMatch> findMatches(
+ ResourceRequest resourceRequest,
+ TaskGroupKey groupKey,
+ Set<IAssignedTask> tasks,
+ Map<String, TaskGroupKey> preemptionReservations) {
- // Assign tasks reserved for a specific agent (e.g. for update affinity)
- Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
- tasks,
- storeProvider,
- revocable,
- resourceRequest,
- groupKey,
- assignmentResult);
+ // Avoid matching multiple tasks against any offer.
+ Map<String, SchedulingMatch> matchesByOffer = Maps.newHashMap();
- // Assign the rest of the non-reserved tasks
- for (IAssignedTask task : nonReservedTasks) {
- try {
+ tasks.forEach(task -> {
+ ReservationStatus reservation = getReservation(task, resourceRequest);
+ Optional<HostOffer> chosenOffer;
+ if (reservation.isTaskReserving()) {
+ // Use the reserved offer, which may not currently exist.
+ chosenOffer = reservation.getOffer();
+ } else {
// Get all offers that will satisfy the given ResourceRequest and that are not reserved
// for updates or preemption
- FluentIterable<HostOffer> matchingOffers = FluentIterable
- .from(offerManager.getAllMatching(groupKey, resourceRequest, revocable))
- .filter(o -> !isAgentReserved(o, groupKey, preemptionReservations));
+ Iterable<HostOffer> matchingOffers = Iterables.filter(
+ offerManager.getAllMatching(groupKey, resourceRequest),
+ o -> !matchesByOffer.containsKey(o.getOffer().getId().getValue())
+ && !isAgentReserved(o, groupKey, preemptionReservations));
// Determine which is the optimal offer to select for the given request
- Optional<HostOffer> optionalOffer = offerSelector.select(matchingOffers, resourceRequest);
-
- // If no offer is chosen, continue to the next task
- if (!optionalOffer.isPresent()) {
- continue;
- }
-
- // Attempt to launch the task using the chosen offer
- HostOffer offer = optionalOffer.get();
- launchUsingOffer(storeProvider,
- revocable,
- resourceRequest,
- task,
- offer,
- assignmentResult);
- } catch (OfferManager.LaunchException e) {
+ chosenOffer = offerSelector.select(matchingOffers, resourceRequest);
+ }
+
+ chosenOffer.ifPresent(hostOffer -> {
+ matchesByOffer.put(
+ hostOffer.getOffer().getId().getValue(),
+ new SchedulingMatch(task, hostOffer));
+ });
+ });
+
+ return matchesByOffer.values();
+ }
+
+ @Timed("assigner_maybe_assign")
+ @Override
+ public Set<String> maybeAssign(
+ MutableStoreProvider storeProvider,
+ ResourceRequest resourceRequest,
+ TaskGroupKey groupKey,
+ Set<IAssignedTask> tasks,
+ Map<String, TaskGroupKey> reservations) {
+
+ ImmutableSet.Builder<String> assigned = ImmutableSet.builder();
+
+ for (SchedulingMatch match : findMatches(resourceRequest, groupKey, tasks, reservations)) {
+ try {
+ launchUsingOffer(storeProvider, resourceRequest, match.task, match.offer);
+ assigned.add(match.task.getTaskId());
+ } catch (LaunchException e) {
// Any launch exception causes the scheduling round to terminate for this TaskGroup.
break;
}
}
- return assignmentResult.build();
+ return assigned.build();
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 3c38f95..d2f3257 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -31,5 +31,5 @@ public interface TaskScheduler extends EventSubscriber {
* @return Successfully scheduled task IDs. The caller should call schedule again if a given
* task ID was not present in the result.
*/
- Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds);
+ Set<String> schedule(MutableStoreProvider storeProvider, Set<String> taskIds);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
index cff4ab1..edab03d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
@@ -19,6 +19,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
@@ -26,7 +27,6 @@ import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -34,16 +34,17 @@ import com.google.common.eventbus.Subscribe;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -55,10 +56,8 @@ import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toMap;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
/**
* An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task
@@ -81,6 +80,7 @@ public class TaskSchedulerImpl implements TaskScheduler {
private final TaskAssigner assigner;
private final Preemptor preemptor;
private final ExecutorSettings executorSettings;
+ private final TierManager tierManager;
private final BiCache<String, TaskGroupKey> reservations;
private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
@@ -92,16 +92,18 @@ public class TaskSchedulerImpl implements TaskScheduler {
TaskAssigner assigner,
Preemptor preemptor,
ExecutorSettings executorSettings,
+ TierManager tierManager,
BiCache<String, TaskGroupKey> reservations) {
this.assigner = requireNonNull(assigner);
this.preemptor = requireNonNull(preemptor);
this.executorSettings = requireNonNull(executorSettings);
+ this.tierManager = requireNonNull(tierManager);
this.reservations = requireNonNull(reservations);
}
@Timed("task_schedule_attempt")
- public Set<String> schedule(Storage.MutableStoreProvider store, Iterable<String> taskIds) {
+ public Set<String> schedule(MutableStoreProvider store, Set<String> taskIds) {
try {
return scheduleTasks(store, taskIds);
} catch (RuntimeException e) {
@@ -115,77 +117,67 @@ public class TaskSchedulerImpl implements TaskScheduler {
}
}
- private Set<String> scheduleTasks(Storage.MutableStoreProvider store, Iterable<String> tasks) {
- ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
- String taskIdValues = Joiner.on(",").join(taskIds);
- LOG.debug("Attempting to schedule tasks {}", taskIdValues);
- ImmutableSet<IAssignedTask> assignedTasks =
- ImmutableSet.copyOf(Iterables.transform(
- store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
- IScheduledTask::getAssignedTask));
-
- if (Iterables.isEmpty(assignedTasks)) {
- LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues);
- return taskIds;
+ private Map<String, IAssignedTask> fetchTasks(StoreProvider store, Set<String> ids) {
+ Map<String, IAssignedTask> tasks = store.getTaskStore()
+ .fetchTasks(Query.taskScoped(ids).byStatus(PENDING))
+ .stream()
+ .map(IScheduledTask::getAssignedTask)
+ .collect(Collectors.toMap(
+ IAssignedTask::getTaskId,
+ Function.identity()
+ ));
+
+ if (ids.size() != tasks.size()) {
+ LOG.warn("Failed to look up tasks "
+ + Joiner.on(", ").join(Sets.difference(ids, tasks.keySet())));
}
+ return tasks;
+ }
- Preconditions.checkState(
- assignedTasks.stream()
- .collect(Collectors.groupingBy(t -> t.getTask()))
- .entrySet()
- .size() == 1,
- "Found multiple task groups for %s",
- taskIdValues);
-
- Map<String, IAssignedTask> assignableTaskMap =
- assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+ private Set<String> scheduleTasks(MutableStoreProvider store, Set<String> ids) {
+ LOG.debug("Attempting to schedule tasks {}", ids);
+ Map<String, IAssignedTask> tasksById = fetchTasks(store, ids);
- if (taskIds.size() != assignedTasks.size()) {
- LOG.warn("Failed to look up tasks "
- + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
+ if (tasksById.isEmpty()) {
+ // None of the tasks were found in storage. This could be caused by a task group that was
+ // killed by the user, for example.
+ return ids;
}
- // This is safe after all checks above.
- ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+ // Prepare scheduling context for the tasks
+ ITaskConfig task = Iterables.getOnlyElement(tasksById.values().stream()
+ .map(IAssignedTask::getTask)
+ .collect(Collectors.toSet()));
AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
- // Valid Docker tasks can have a container but no executor config
- ResourceBag overhead = ResourceBag.EMPTY;
- if (task.isSetExecutorConfig()) {
- overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
- .orElseThrow(
- () -> new IllegalArgumentException("Cannot find executor configuration"));
- }
-
+ // Attempt to schedule using available resources.
Set<String> launched = assigner.maybeAssign(
store,
- new SchedulingFilter.ResourceRequest(
- task,
- bagFromResources(task.getResources()).add(overhead), aggregate),
+ ResourceRequest.fromTask(task, executorSettings, aggregate, tierManager),
TaskGroupKey.from(task),
- assignedTasks,
+ ImmutableSet.copyOf(tasksById.values()),
reservations.asMap());
- attemptsFired.addAndGet(assignableTaskMap.size());
- Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
+ attemptsFired.addAndGet(tasksById.size());
- failedToLaunch.forEach(taskId -> {
- // Task could not be scheduled.
+ // Fall back to preemption for tasks not scheduled above.
+ Set<String> unassigned = Sets.difference(tasksById.keySet(), launched);
+ unassigned.forEach(taskId -> {
// TODO(maxim): Now that preemption slots are searched asynchronously, consider
// retrying a launch attempt within the current scheduling round IFF a reservation is
// available.
- maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+ maybePreemptFor(tasksById.get(taskId), aggregate, store);
});
- attemptsNoMatch.addAndGet(failedToLaunch.size());
+ attemptsNoMatch.addAndGet(unassigned.size());
// Return all successfully launched tasks as well as those weren't tried (not in PENDING).
- return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
+ return Sets.union(launched, Sets.difference(ids, tasksById.keySet()));
}
private void maybePreemptFor(
IAssignedTask task,
AttributeAggregate jobState,
- Storage.MutableStoreProvider storeProvider) {
+ MutableStoreProvider storeProvider) {
if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
return;
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 1219215..ebb345d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -13,6 +13,7 @@
*/
package org.apache.aurora.scheduler.storage;
+import java.util.Collection;
import java.util.Optional;
import java.util.Set;
@@ -47,7 +48,7 @@ public interface TaskStore {
* @param query Builder of the query to identify tasks with.
* @return A read-only view of matching tasks.
*/
- Iterable<IScheduledTask> fetchTasks(Query.Builder query);
+ Collection<IScheduledTask> fetchTasks(Query.Builder query);
/**
* Fetches all job keys represented in the task store.
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
index dea8e69..8d70cae 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
@@ -13,6 +13,7 @@
*/
package org.apache.aurora.scheduler.storage.durability;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -327,7 +328,7 @@ public class WriteRecorder implements
}
@Override
- public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+ public Collection<IScheduledTask> fetchTasks(Query.Builder query) {
return this.taskStore.fetchTasks(query);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
index b6909a6..59eca7b 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
@@ -14,12 +14,9 @@
package org.apache.aurora.scheduler.updater;
import java.util.Optional;
-import java.util.Set;
-import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
import org.slf4j.Logger;
@@ -58,21 +55,13 @@ public interface UpdateAgentReserver {
Optional<String> getAgent(IInstanceKey key);
/**
- * Get all reservations for a given agent id. Useful for skipping over the agent between the
+ * Checks whether an agent is currently reserved. Useful for skipping over the agent between the
* reserve/release window.
*
* @param agentId The agent id to look up reservations for.
* @return A set of keys reserved for that agent.
*/
- Set<IInstanceKey> getReservations(String agentId);
-
- /**
- * Check if the agent reserver has any reservations for the provided key.
- *
- * @param groupKey The key to check.
- * @return True if there are reservations against any instances in that key.
- */
- boolean hasReservations(TaskGroupKey groupKey);
+ boolean isReserved(String agentId);
/**
* Implementation of the update reserver backed by a BiCache (the same mechanism we use for
@@ -99,16 +88,9 @@ public interface UpdateAgentReserver {
cache.remove(key, agentId);
}
- public Set<IInstanceKey> getReservations(String agentId) {
- return cache.getByValue(agentId);
- }
-
@Override
- public boolean hasReservations(TaskGroupKey groupKey) {
- return cache.asMap().entrySet().stream()
- .filter(entry -> entry.getKey().getJobKey().equals(groupKey.getTask().getJob()))
- .findFirst()
- .isPresent();
+ public boolean isReserved(String agentId) {
+ return !cache.getByValue(agentId).isEmpty();
}
@Override
@@ -137,12 +119,7 @@ public interface UpdateAgentReserver {
}
@Override
- public Set<IInstanceKey> getReservations(String agentId) {
- return ImmutableSet.of();
- }
-
- @Override
- public boolean hasReservations(TaskGroupKey groupKey) {
+ public boolean isReserved(String agentId) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
index 82e40d5..f116e3a 100644
--- a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
@@ -48,6 +48,13 @@ public class TierManagerTest {
}
@Test
+ public void testDefaultTier() {
+ assertEquals(
+ DEV_TIER,
+ TIER_MANAGER.getTier(ITaskConfig.build(new TaskConfig())));
+ }
+
+ @Test
public void testGetTierRevocableAndProduction() {
assertEquals(
REVOCABLE_TIER,
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 64d7a44..7136711 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -23,13 +23,12 @@ import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.TaskVars;
import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.metadata.NearestFit;
-import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -53,8 +52,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
private static final UnusedResource RESOURCE = new UnusedResource(
ResourceManager.bagFromResources(TASK.getResources()),
IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
- private static final ResourceRequest REQUEST =
- new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.empty());
+ private static final ResourceRequest REQUEST = TaskTestUtil.toResourceRequest(TASK);
private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 0de90d7..21d4e47 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -38,14 +38,13 @@ import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.gen.ValueConstraint;
import org.apache.aurora.gen.apiConstants;
import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
-import org.apache.aurora.scheduler.mesos.TaskExecutors;
import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.entities.IAttribute;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -57,8 +56,10 @@ import org.junit.Test;
import static org.apache.aurora.gen.Resource.diskMb;
import static org.apache.aurora.gen.Resource.numCpus;
import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
@@ -135,22 +136,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
none,
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(noPortTask, bag(noPortTask), empty())));
+ TaskTestUtil.toResourceRequest(noPortTask)));
assertEquals(
none,
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(onePortTask, bag(onePortTask), empty())));
+ TaskTestUtil.toResourceRequest(onePortTask)));
assertEquals(
none,
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(twoPortTask, bag(twoPortTask), empty())));
+ TaskTestUtil.toResourceRequest(twoPortTask)));
assertEquals(
ImmutableSet.of(veto(PORTS, 1)),
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(threePortTask, bag(threePortTask), empty())));
+ TaskTestUtil.toResourceRequest(threePortTask)));
}
@Test
@@ -238,13 +239,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
DEFAULT_OFFER,
hostAttributes(HOST_A),
Optional.of(start));
- ResourceRequest request = new ResourceRequest(task, bag(task), empty());
control.replay();
assertEquals(
ImmutableSet.of(Veto.maintenance("draining")),
- defaultFilter.filter(unusedResource, request));
+ defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task)));
}
@Test
@@ -262,14 +262,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
DEFAULT_OFFER,
hostAttributes(HOST_A),
Optional.of(start));
- ResourceRequest request = new ResourceRequest(task, bag(task), empty());
control.replay();
assertEquals(
ImmutableSet.of(),
- defaultFilter.filter(unusedResource, request));
-
+ defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task)));
}
@Test
@@ -350,7 +348,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
}
@Test
- public void testLimitWithinJob() throws Exception {
+ public void testLimitWithinJob() {
control.replay();
AttributeAggregate stateA = AttributeAggregate.create(
@@ -465,7 +463,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
ImmutableSet.of(),
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(task, bag(task), empty())));
+ TaskTestUtil.toResourceRequest(task)));
Constraint jvmNegated = jvmConstraint.deepCopy();
jvmNegated.getConstraint().getValue().setNegated(true);
@@ -577,7 +575,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
expected,
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostAttributes),
- new ResourceRequest(task, bag(task), aggregate))
+ TaskTestUtil.toResourceRequest(task))
.isEmpty());
Constraint negated = constraint.deepCopy();
@@ -587,7 +585,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
!expected,
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostAttributes),
- new ResourceRequest(negatedTask, bag(negatedTask), aggregate))
+ ResourceRequest.fromTask(negatedTask, NO_OVERHEAD_EXECUTOR, aggregate, TIER_MANAGER))
.isEmpty());
return task;
}
@@ -618,7 +616,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
ImmutableSet.copyOf(vetoes),
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostAttributes),
- new ResourceRequest(task, bag(task), jobState)));
+ ResourceRequest.fromTask(task, NO_OVERHEAD_EXECUTOR, jobState, TIER_MANAGER)));
}
private static IHostAttributes hostAttributes(
@@ -686,10 +684,4 @@ public class SchedulingFilterImplTest extends EasyMockTest {
private ITaskConfig makeTask() {
return makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK);
}
-
- private ResourceBag bag(ITaskConfig task) {
- return ResourceManager.bagFromResources(task.getResources())
- .add(TaskExecutors.NO_OVERHEAD_EXECUTOR.getExecutorOverhead(
- task.getExecutorConfig().getName()).get());
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index c27a662..686087e 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -38,6 +38,7 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -124,15 +125,13 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
.setAgentId(SLAVE)
.setHostname("slave-hostname")
.addAllResources(mesosScalarFromBag(bagFromResources(
- TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(
- TASK_CONFIG.getExecutorConfig().getName()).get())))
+ TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(TASK_CONFIG))))
.addResources(mesosRange(PORTS, 80))
.build();
private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder()
.clearResources()
.addAllResources(mesosScalarFromBag(bagFromResources(
- TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(
- TASK_CONFIG.getExecutorConfig().getName()).get())))
+ TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(TASK_CONFIG))))
.addResources(mesosRange(PORTS, 80))
.build();
@@ -282,10 +281,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
ResourceBag executorResources =
bagFromMesosResources(taskInfo.getExecutor().getResourcesList());
- assertEquals(
- bagFromResources(task.getResources()).add(
- config.getExecutorOverhead(task.getExecutorConfig().getName()).get()),
- taskResources.add(executorResources));
+ assertEquals(ResourceManager.bagFromTask(task, config), taskResources.add(executorResources));
}
private void checkDiscoveryInfoUnset(TaskInfo taskInfo) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 1e9532b..28224a5 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -29,6 +29,7 @@ import org.apache.aurora.common.util.testing.FakeTicker;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
@@ -37,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.offers.Deferment.Noop;
-import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -57,7 +57,6 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
import static org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES;
import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES;
@@ -100,10 +99,8 @@ public class OfferManagerImplTest extends EasyMockTest {
private static final int PORT = 1000;
private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
private static final IScheduledTask TASK = makeTask("id", JOB);
- private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
- TASK.getAssignedTask().getTask(),
- ResourceBag.EMPTY,
- empty());
+ private static final ResourceRequest EMPTY_REQUEST =
+ TaskTestUtil.toResourceRequest(TASK.getAssignedTask().getTask());
private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
.setName("taskName")
@@ -250,14 +247,14 @@ public class OfferManagerImplTest extends EasyMockTest {
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
offerManager.add(OFFER_A);
assertEquals(OFFER_A,
- Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
// Add static ban.
offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
- assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
}
@Test
@@ -269,14 +266,14 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.add(OFFER_A);
offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
- assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
// Make sure the static ban expires after maximum amount of time an offer is held.
FAKE_TICKER.advance(RETURN_DELAY);
offerManager.cleanupStaticBans();
assertEquals(OFFER_A,
- Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
}
@@ -289,7 +286,7 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.add(OFFER_A);
offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
- assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
// Make sure the static ban is cleared when driver is disconnected.
@@ -297,7 +294,7 @@ public class OfferManagerImplTest extends EasyMockTest {
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
offerManager.add(OFFER_A);
assertEquals(OFFER_A,
- Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
}
@Test
@@ -361,14 +358,14 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.add(OFFER_A);
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
- assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
offerManager.cancel(OFFER_A_ID);
offerManager.add(OFFER_A);
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
assertEquals(OFFER_A,
- Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
}
private static HostOffer setUnavailability(HostOffer offer, long startMs) {
@@ -423,7 +420,7 @@ public class OfferManagerImplTest extends EasyMockTest {
cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(ImmutableList.of(small, medium, large),
ImmutableList.copyOf(cpuManager.getAll()));
}
@@ -460,7 +457,7 @@ public class OfferManagerImplTest extends EasyMockTest {
cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, true)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(ImmutableList.of(small, medium, large),
ImmutableList.copyOf(cpuManager.getAll()));
}
@@ -488,7 +485,7 @@ public class OfferManagerImplTest extends EasyMockTest {
cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(ImmutableList.of(small, medium, large),
ImmutableList.copyOf(cpuManager.getAll()));
}
@@ -516,7 +513,7 @@ public class OfferManagerImplTest extends EasyMockTest {
cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(ImmutableList.of(small, medium, large),
ImmutableList.copyOf(cpuManager.getAll()));
}
@@ -555,7 +552,7 @@ public class OfferManagerImplTest extends EasyMockTest {
cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(ImmutableList.of(small, medium, large),
ImmutableList.copyOf(cpuManager.getAll()));
}
@@ -628,7 +625,7 @@ public class OfferManagerImplTest extends EasyMockTest {
control.replay();
offerManager.add(OFFER_A);
assertEquals(Optional.of(OFFER_A),
- offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+ offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
}
@Test
@@ -640,7 +637,7 @@ public class OfferManagerImplTest extends EasyMockTest {
assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
offerManager.ban(OFFER_A_ID);
assertEquals(Optional.empty(),
- offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+ offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
}
@@ -655,7 +652,7 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.add(OFFER_A);
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
assertEquals(Optional.empty(),
- offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+ offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
}
@@ -669,7 +666,7 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.add(OFFER_C);
assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C),
- ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
}
@@ -685,7 +682,7 @@ public class OfferManagerImplTest extends EasyMockTest {
assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
offerManager.ban(OFFER_B.getOffer().getId());
assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
- ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
}
@@ -702,7 +699,7 @@ public class OfferManagerImplTest extends EasyMockTest {
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY);
assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
- ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), GROUP_KEY)),
@@ -727,7 +724,7 @@ public class OfferManagerImplTest extends EasyMockTest {
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
assertEquals(ImmutableSet.of(OFFER_A),
- ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
assertEquals(ImmutableSet.of(empty, OFFER_A),
ImmutableSet.copyOf(offerManager.getAll()));
@@ -750,7 +747,7 @@ public class OfferManagerImplTest extends EasyMockTest {
assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
assertEquals(ImmutableSet.of(OFFER_B),
- ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), GROUP_KEY)),