You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/08/21 00:31:16 UTC

aurora git commit: Updating preemptor to account for revocable offers/tasks

Repository: aurora
Updated Branches:
  refs/heads/master 74a121772 -> f4446a612


Updating preemptor to account for revocable offers/tasks

Bugs closed: AURORA-1418

Reviewed at https://reviews.apache.org/r/37624/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f4446a61
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f4446a61
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f4446a61

Branch: refs/heads/master
Commit: f4446a61220344c029af669ed0351e086d0a198d
Parents: 74a1217
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Aug 20 15:21:53 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Aug 20 15:21:53 2015 -0700

----------------------------------------------------------------------
 .../scheduler/preemptor/PreemptionVictim.java   |  68 ++++-------
 .../preemptor/PreemptionVictimFilter.java       |  16 ++-
 .../preemptor/PreemptionVictimFilterTest.java   | 117 +++++++++++++++++--
 3 files changed, 143 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
index 8162323..8f3161a 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
@@ -19,66 +19,48 @@ import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * A victim to be considered as a candidate for preemption.
  */
 public final class PreemptionVictim {
-  private final String slaveHost;
-  private final boolean production;
-  private final String role;
-  private final int priority;
-  private final ResourceSlot resourceSlot;
-  private final String taskId;
-
-  private PreemptionVictim(
-      String slaveHost,
-      boolean production,
-      String role,
-      int priority,
-      ResourceSlot resourceSlot,
-      String taskId) {
-
-    this.slaveHost = slaveHost;
-    this.production = production;
-    this.role = role;
-    this.priority = priority;
-    this.resourceSlot = resourceSlot;
-    this.taskId = taskId;
+  private final IAssignedTask task;
+
+  private PreemptionVictim(IAssignedTask task) {
+    this.task = requireNonNull(task);
   }
 
   public static PreemptionVictim fromTask(IAssignedTask task) {
-    ITaskConfig config = task.getTask();
-    return new PreemptionVictim(
-        task.getSlaveHost(),
-        config.isProduction(),
-        config.getJob().getRole(),
-        config.getPriority(),
-        ResourceSlot.from(task.getTask()),
-        task.getTaskId());
+    return new PreemptionVictim(task);
   }
 
   public String getSlaveHost() {
-    return slaveHost;
+    return task.getSlaveHost();
   }
 
   public boolean isProduction() {
-    return production;
+    return task.getTask().isProduction();
   }
 
   public String getRole() {
-    return role;
+    return task.getTask().getJob().getRole();
   }
 
   public int getPriority() {
-    return priority;
+    return task.getTask().getPriority();
   }
 
   public ResourceSlot getResourceSlot() {
-    return resourceSlot;
+    return ResourceSlot.from(task.getTask());
   }
 
   public String getTaskId() {
-    return taskId;
+    return task.getTaskId();
+  }
+
+  public ITaskConfig getConfig() {
+    return task.getTask();
   }
 
   @Override
@@ -88,28 +70,18 @@ public final class PreemptionVictim {
     }
 
     PreemptionVictim other = (PreemptionVictim) o;
-    return Objects.equals(getSlaveHost(), other.getSlaveHost())
-        && Objects.equals(isProduction(), other.isProduction())
-        && Objects.equals(getRole(), other.getRole())
-        && Objects.equals(getPriority(), other.getPriority())
-        && Objects.equals(getResourceSlot(), other.getResourceSlot())
-        && Objects.equals(getTaskId(), other.getTaskId());
+    return Objects.equals(task, other.task);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(slaveHost, production, role, priority, resourceSlot, taskId);
+    return Objects.hashCode(task);
   }
 
   @Override
   public String toString() {
     return com.google.common.base.Objects.toStringHelper(this)
-        .add("slaveHost", getSlaveHost())
-        .add("production", isProduction())
-        .add("role", getRole())
-        .add("priority", getPriority())
-        .add("resourceSlot", getResourceSlot())
-        .add("taskId", getTaskId())
+        .add("task", task)
         .toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index a0e71e1..67d7f07 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Sets;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -80,23 +81,26 @@ public interface PreemptionVictimFilter {
     private final SchedulingFilter schedulingFilter;
     private final ExecutorSettings executorSettings;
     private final PreemptorMetrics metrics;
+    private final TierManager tierManager;
 
     @Inject
     PreemptionVictimFilterImpl(
         SchedulingFilter schedulingFilter,
         ExecutorSettings executorSettings,
-        PreemptorMetrics metrics) {
+        PreemptorMetrics metrics,
+        TierManager tierManager) {
 
       this.schedulingFilter = requireNonNull(schedulingFilter);
       this.executorSettings = requireNonNull(executorSettings);
       this.metrics = requireNonNull(metrics);
+      this.tierManager = requireNonNull(tierManager);
     }
 
     private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
         new Function<HostOffer, ResourceSlot>() {
           @Override
           public ResourceSlot apply(HostOffer offer) {
-            return Resources.from(offer.getOffer()).slot();
+            return Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot();
           }
         };
 
@@ -120,7 +124,13 @@ public interface PreemptionVictimFilter {
         new Function<PreemptionVictim, ResourceSlot>() {
           @Override
           public ResourceSlot apply(PreemptionVictim victim) {
-            return victim.getResourceSlot().withOverhead(executorSettings);
+            ResourceSlot slot = victim.getResourceSlot();
+            if (tierManager.getTier(victim.getConfig()).isRevocable()) {
+              // Revocable task CPU cannot be used for preemption purposes as it's a compressible
+              // resource. We can still use RAM, DISK and PORTS as they are not compressible.
+              slot = new ResourceSlot(0.0, slot.getRam(), slot.getDisk(), slot.getNumPorts());
+            }
+            return slot.withOverhead(executorSettings);
           }
         };
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/f4446a61/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index 66f20c6..8a1599a 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -20,6 +20,7 @@ import java.util.Set;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
@@ -37,6 +38,8 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.TierInfo;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
@@ -47,6 +50,7 @@ import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.Protos;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IExpectationSetters;
@@ -55,6 +59,7 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
 import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
 import static org.apache.mesos.Protos.Offer;
@@ -80,11 +85,13 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   private static final String HOST_ATTRIBUTE = "host";
   private static final String OFFER = "offer";
   private static final Optional<HostOffer> NO_OFFER = Optional.absent();
+  private static final TierInfo DEFAULT_TIER = new TierInfo(false);
 
   private StorageTestUtil storageUtil;
   private SchedulingFilter schedulingFilter;
   private FakeStatsProvider statsProvider;
   private PreemptorMetrics preemptorMetrics;
+  private TierManager tierManager;
 
   @Before
   public void setUp() {
@@ -92,6 +99,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     storageUtil.expectOperations();
     statsProvider = new FakeStatsProvider();
     preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
+    tierManager = createMock(TierManager.class);
   }
 
   private Optional<ImmutableSet<PreemptionVictim>> runFilter(
@@ -103,7 +111,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
         new PreemptionVictimFilter.PreemptionVictimFilterImpl(
             schedulingFilter,
             TaskExecutors.NO_OVERHEAD_EXECUTOR,
-            preemptorMetrics);
+            preemptorMetrics,
+            tierManager);
 
     return filter.filterPreemptionVictims(
         ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
@@ -118,6 +127,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
     ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
     assignToHost(lowPriority);
 
@@ -134,6 +144,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
     ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
     assignToHost(lowPriority);
 
@@ -153,6 +164,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
     ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
     assignToHost(highPriority);
 
@@ -189,6 +201,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
     // Use a very low priority for the production task to show that priority is irrelevant.
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
@@ -205,6 +218,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
     // Use a very low priority for the production task to show that priority is irrelevant.
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
     ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
@@ -231,6 +245,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   @Test
   public void testProductionPreemptingManyNonProduction() throws Exception {
     schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(5);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
 
@@ -253,6 +268,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   @Test
   public void testMinimalSetPreempted() throws Exception {
     schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(9);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
 
@@ -297,6 +313,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   @Test
   public void testPreemptWithOfferAndTask() throws Exception {
     schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
 
     setUpHost();
 
@@ -309,7 +326,78 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
     control.replay();
     assertVictims(
-        runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1),
+        runFilter(
+            p1,
+            makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
+            a1),
+        a1);
+  }
+
+  // Ensures revocable offer resources are filtered out.
+  @Test
+  public void testRevocableOfferFiltered() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
+
+    setUpHost();
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertNoVictims(runFilter(
+        p1,
+        makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, true),
+        a1));
+  }
+
+  // Ensures revocable task CPU is not considered for preemption.
+  @Test
+  public void testRevocableVictimsFiltered() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(new TierInfo(true));
+
+    setUpHost();
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertNoVictims(runFilter(
+        p1,
+        makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
+        a1));
+  }
+
+  // Ensures revocable victim non-compressible resources are still considered.
+  @Test
+  public void testRevocableVictimRamUsed() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(new TierInfo(true));
+
+    setUpHost();
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertVictims(
+        runFilter(
+            p1,
+            makeOffer(OFFER, 2, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1, false),
+            a1),
         a1);
   }
 
@@ -317,6 +405,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   @Test
   public void testPreemptWithOfferAndMultipleTasks() throws Exception {
     schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER).times(5);
 
     setUpHost();
 
@@ -333,7 +422,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
 
     control.replay();
     Optional<HostOffer> offer =
-        makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
+        makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1, false);
     assertVictims(runFilter(p1, offer, a1, a2), a1, a2);
   }
 
@@ -368,6 +457,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
   @Test
   public void testAllVictimsVetoed() {
     schedulingFilter = createMock(SchedulingFilter.class);
+    expect(tierManager.getTier(EasyMock.anyObject())).andReturn(DEFAULT_TIER);
     ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
     assignToHost(task);
 
@@ -410,17 +500,30 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
       double cpu,
       Amount<Long, Data> ram,
       Amount<Long, Data> disk,
-      int numPorts) {
+      int numPorts,
+      boolean revocable) {
 
     List<Resource> resources = new ResourceSlot(cpu, ram, disk, numPorts).toResourceList();
+    if (revocable) {
+      resources = ImmutableList.<Resource>builder()
+          .addAll(FluentIterable.from(resources)
+              .filter(e -> !e.getName().equals(CPUS.getName()))
+              .toList())
+          .add(Protos.Resource.newBuilder()
+              .setName(CPUS.getName())
+              .setType(Protos.Value.Type.SCALAR)
+              .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu))
+              .setRevocable(Resource.RevocableInfo.newBuilder())
+              .build())
+          .build();
+    }
     Offer.Builder builder = Offer.newBuilder();
     builder.getIdBuilder().setValue(offerId);
     builder.getFrameworkIdBuilder().setValue("framework-id");
     builder.getSlaveIdBuilder().setValue(SLAVE_ID);
     builder.setHostname(HOST);
-    for (Resource r: resources) {
-      builder.addResources(r);
-    }
+    builder.addAllResources(resources);
+
     return Optional.of(new HostOffer(
         builder.build(),
         IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))));