You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/08/21 00:31:16 UTC
aurora git commit: Updating preemptor to account for revocable
offers/tasks
Repository: aurora
Updated Branches:
refs/heads/master 74a121772 -> f4446a612
Updating preemptor to account for revocable offers/tasks
Bugs closed: AURORA-1418
Reviewed at https://reviews.apache.org/r/37624/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f4446a61
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f4446a61
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f4446a61
Branch: refs/heads/master
Commit: f4446a61220344c029af669ed0351e086d0a198d
Parents: 74a1217
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Aug 20 15:21:53 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Aug 20 15:21:53 2015 -0700
----------------------------------------------------------------------
.../scheduler/preemptor/PreemptionVictim.java | 68 ++++-------
.../preemptor/PreemptionVictimFilter.java | 16 ++-
.../preemptor/PreemptionVictimFilterTest.java | 117 +++++++++++++++++--
3 files changed, 143 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
index 8162323..8f3161a 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
@@ -19,66 +19,48 @@ import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import static java.util.Objects.requireNonNull;
+
/**
* A victim to be considered as a candidate for preemption.
*/
public final class PreemptionVictim {
- private final String slaveHost;
- private final boolean production;
- private final String role;
- private final int priority;
- private final ResourceSlot resourceSlot;
- private final String taskId;
-
- private PreemptionVictim(
- String slaveHost,
- boolean production,
- String role,
- int priority,
- ResourceSlot resourceSlot,
- String taskId) {
-
- this.slaveHost = slaveHost;
- this.production = production;
- this.role = role;
- this.priority = priority;
- this.resourceSlot = resourceSlot;
- this.taskId = taskId;
+ private final IAssignedTask task;
+
+ private PreemptionVictim(IAssignedTask task) {
+ this.task = requireNonNull(task);
}
public static PreemptionVictim fromTask(IAssignedTask task) {
- ITaskConfig config = task.getTask();
- return new PreemptionVictim(
- task.getSlaveHost(),
- config.isProduction(),
- config.getJob().getRole(),
- config.getPriority(),
- ResourceSlot.from(task.getTask()),
- task.getTaskId());
+ return new PreemptionVictim(task);
}
public String getSlaveHost() {
- return slaveHost;
+ return task.getSlaveHost();
}
public boolean isProduction() {
- return production;
+ return task.getTask().isProduction();
}
public String getRole() {
- return role;
+ return task.getTask().getJob().getRole();
}
public int getPriority() {
- return priority;
+ return task.getTask().getPriority();
}
public ResourceSlot getResourceSlot() {
- return resourceSlot;
+ return ResourceSlot.from(task.getTask());
}
public String getTaskId() {
- return taskId;
+ return task.getTaskId();
+ }
+
+ public ITaskConfig getConfig() {
+ return task.getTask();
}
@Override
@@ -88,28 +70,18 @@ public final class PreemptionVictim {
}
PreemptionVictim other = (PreemptionVictim) o;
- return Objects.equals(getSlaveHost(), other.getSlaveHost())
- && Objects.equals(isProduction(), other.isProduction())
- && Objects.equals(getRole(), other.getRole())
- && Objects.equals(getPriority(), other.getPriority())
- && Objects.equals(getResourceSlot(), other.getResourceSlot())
- && Objects.equals(getTaskId(), other.getTaskId());
+ return Objects.equals(task, other.task);
}
@Override
public int hashCode() {
- return Objects.hash(slaveHost, production, role, priority, resourceSlot, taskId);
+ return Objects.hashCode(task);
}
@Override
public String toString() {
return com.google.common.base.Objects.toStringHelper(this)
- .add("slaveHost", getSlaveHost())
- .add("production", isProduction())
- .add("role", getRole())
- .add("priority", getPriority())
- .add("resourceSlot", getResourceSlot())
- .add("taskId", getTaskId())
+ .add("task", task)
.toString();
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index a0e71e1..67d7f07 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Sets;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -80,23 +81,26 @@ public interface PreemptionVictimFilter {
private final SchedulingFilter schedulingFilter;
private final ExecutorSettings executorSettings;
private final PreemptorMetrics metrics;
+ private final TierManager tierManager;
@Inject
PreemptionVictimFilterImpl(
SchedulingFilter schedulingFilter,
ExecutorSettings executorSettings,
- PreemptorMetrics metrics) {
+ PreemptorMetrics metrics,
+ TierManager tierManager) {
this.schedulingFilter = requireNonNull(schedulingFilter);
this.executorSettings = requireNonNull(executorSettings);
this.metrics = requireNonNull(metrics);
+ this.tierManager = requireNonNull(tierManager);
}
private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
new Function<HostOffer, ResourceSlot>() {
@Override
public ResourceSlot apply(HostOffer offer) {
- return Resources.from(offer.getOffer()).slot();
+ return Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot();
}
};
@@ -120,7 +124,13 @@ public interface PreemptionVictimFilter {
new Function<PreemptionVictim, ResourceSlot>() {
@Override
public ResourceSlot apply(PreemptionVictim victim) {
- return victim.getResourceSlot().withOverhead(executorSettings);
+ ResourceSlot slot = victim.getResourceSlot();
+ if (tierManager.getTier(victim.getConfig()).isRevocable()) {
+ // Revocable task CPU cannot be used for preemption purposes as it's a compressible
+ // resource. We can still use RAM, DISK and PORTS as they are not compressible.
+ slot = new ResourceSlot(0.0, slot.getRam(), slot.getDisk(), slot.getNumPorts());
+ }
+ return slot.withOverhead(executorSettings);
}
};
http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index 66f20c6..8a1599a 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -20,6 +20,7 @@ import java.util.Set;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
@@ -37,6 +38,8 @@ import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.TierInfo;
+import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
@@ -47,6 +50,7 @@ import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.Protos;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
@@ -55,6 +59,7 @@ import org.junit.Test;
import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
import static org.apache.mesos.Protos.Offer;
@@ -80,11 +85,13 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
private static final String HOST_ATTRIBUTE = "host";
private static final String OFFER = "offer";
private static final Optional<HostOffer> NO_OFFER = Optional.absent();
+ private static final TierInfo DEFAULT_TIER = new TierInfo(false);
private StorageTestUtil storageUtil;
private SchedulingFilter schedulingFilter;
private FakeStatsProvider statsProvider;
private PreemptorMetrics preemptorMetrics;
+ private TierManager tierManager;
@Before
public void setUp() {
@@ -92,6 +99,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
storageUtil.expectOperations();
statsProvider = new FakeStatsProvider();
preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
+ tierManager = createMock(TierManager.class);
}
private Optional<ImmutableSet<PreemptionVictim>> runFilter(
@@ -103,7 +111,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
new PreemptionVictimFilter.PreemptionVictimFilterImpl(
schedulingFilter,
TaskExecutors.NO_OVERHEAD_EXECUTOR,
- preemptorMetrics);
+ preemptorMetrics,
+ tierManager);
return filter.filterPreemptionVictims(
ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
@@ -118,6 +127,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
assignToHost(lowPriority);
@@ -134,6 +144,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
assignToHost(lowPriority);
@@ -153,6 +164,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
assignToHost(highPriority);
@@ -189,6 +201,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
// Use a very low priority for the production task to show that priority is irrelevant.
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
@@ -205,6 +218,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
setUpHost();
schedulingFilter = createMock(SchedulingFilter.class);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
// Use a very low priority for the production task to show that priority is irrelevant.
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
@@ -231,6 +245,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
@Test
public void testProductionPreemptingManyNonProduction() throws Exception {
schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(5);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
@@ -253,6 +268,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
@Test
public void testMinimalSetPreempted() throws Exception {
schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(9);
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
@@ -297,6 +313,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
@Test
public void testPreemptWithOfferAndTask() throws Exception {
schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
setUpHost();
@@ -309,7 +326,78 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
control.replay();
assertVictims(
- runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1),
+ runFilter(
+ p1,
+ makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
+ a1),
+ a1);
+ }
+
+ // Ensures revocable offer resources are filtered out.
+ @Test
+ public void testRevocableOfferFiltered() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
+
+ setUpHost();
+
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+ assignToHost(a1);
+
+ ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+ control.replay();
+ assertNoVictims(runFilter(
+ p1,
+ makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, true),
+ a1));
+ }
+
+ // Ensures revocable task CPU is not considered for preemption.
+ @Test
+ public void testRevocableVictimsFiltered() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(new TierInfo(true));
+
+ setUpHost();
+
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+ assignToHost(a1);
+
+ ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+ control.replay();
+ assertNoVictims(runFilter(
+ p1,
+ makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
+ a1));
+ }
+
+ // Ensures revocable victim non-compressible resources are still considered.
+ @Test
+ public void testRevocableVictimRamUsed() throws Exception {
+ schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(new TierInfo(true));
+
+ setUpHost();
+
+ ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+ a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+ assignToHost(a1);
+
+ ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+ p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+ control.replay();
+ assertVictims(
+ runFilter(
+ p1,
+ makeOffer(OFFER, 2, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
+ a1),
a1);
}
@@ -317,6 +405,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
@Test
public void testPreemptWithOfferAndMultipleTasks() throws Exception {
schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(5);
setUpHost();
@@ -333,7 +422,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
control.replay();
Optional<HostOffer> offer =
- makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
+ makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1, false);
assertVictims(runFilter(p1, offer, a1, a2), a1, a2);
}
@@ -368,6 +457,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
@Test
public void testAllVictimsVetoed() {
schedulingFilter = createMock(SchedulingFilter.class);
+ expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
assignToHost(task);
@@ -410,17 +500,30 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
double cpu,
Amount<Long, Data> ram,
Amount<Long, Data> disk,
- int numPorts) {
+ int numPorts,
+ boolean revocable) {
List<Resource> resources = new ResourceSlot(cpu, ram, disk, numPorts).toResourceList();
+ if (revocable) {
+ resources = ImmutableList.<Resource>builder()
+ .addAll(FluentIterable.from(resources)
+ .filter(e -> !e.getName().equals(CPUS.getName()))
+ .toList())
+ .add(Protos.Resource.newBuilder()
+ .setName(CPUS.getName())
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu))
+ .setRevocable(Resource.RevocableInfo.newBuilder())
+ .build())
+ .build();
+ }
Offer.Builder builder = Offer.newBuilder();
builder.getIdBuilder().setValue(offerId);
builder.getFrameworkIdBuilder().setValue("framework-id");
builder.getSlaveIdBuilder().setValue(SLAVE_ID);
builder.setHostname(HOST);
- for (Resource r: resources) {
- builder.addResources(r);
- }
+ builder.addAllResources(resources);
+
return Optional.of(new HostOffer(
builder.build(),
IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))));