You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/12/02 20:51:17 UTC

incubator-aurora git commit: Minimize the state consumed when collecting preemption victims.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master cf61e8a39 -> c1b078d56


Minimize the state consumed when collecting preemption victims.

Bugs closed: AURORA-121

Reviewed at https://reviews.apache.org/r/28572/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/c1b078d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/c1b078d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/c1b078d5

Branch: refs/heads/master
Commit: c1b078d56fba7690b5212416fce40c156014cafc
Parents: cf61e8a
Author: Bill Farner <wf...@apache.org>
Authored: Tue Dec 2 11:50:49 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Dec 2 11:50:49 2014 -0800

----------------------------------------------------------------------
 .../scheduler/async/preemptor/ClusterState.java |   4 +-
 .../async/preemptor/LiveClusterState.java       |  21 ++--
 .../async/preemptor/PreemptionVictim.java       | 115 +++++++++++++++++++
 .../async/preemptor/PreemptorImpl.java          |  67 ++++++-----
 .../async/preemptor/LiveClusterStateTest.java   |  23 +++-
 5 files changed, 182 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
index 4f0019a..3524dc5 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
@@ -15,8 +15,6 @@ package org.apache.aurora.scheduler.async.preemptor;
 
 import com.google.common.collect.Multimap;
 
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-
 /**
  * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
  */
@@ -30,5 +28,5 @@ public interface ClusterState {
    *
    * @return Active tasks and their associated slave IDs.
    */
-  Multimap<String, IAssignedTask> getSlavesToActiveTasks();
+  Multimap<String, PreemptionVictim> getSlavesToActiveTasks();
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
index 9d83acc..e6bd1b5 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
@@ -18,10 +18,9 @@ import java.util.EnumSet;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 import com.google.common.collect.Sets;
 
 import org.apache.aurora.scheduler.base.Query;
@@ -35,14 +34,6 @@ import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
 import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
 
 class LiveClusterState implements ClusterState {
-  @VisibleForTesting
-  static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
-      new Function<IAssignedTask, String>() {
-        @Override
-        public String apply(IAssignedTask input) {
-          return input.getSlaveId();
-        }
-      };
 
   @VisibleForTesting
   static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
@@ -56,13 +47,19 @@ class LiveClusterState implements ClusterState {
   }
 
   @Override
-  public Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
+  public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
     // Only non-pending active tasks may be preempted.
     Iterable<IAssignedTask> activeTasks = Iterables.transform(
         Storage.Util.fetchTasks(storage, CANDIDATE_QUERY),
         SCHEDULED_TO_ASSIGNED);
 
     // Group the tasks by slave id so they can be paired with offers from the same slave.
-    return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
+    // Choosing to do this iteratively instead of using Multimaps.index/transform to avoid
+    // generating a very large intermediate map.
+    ImmutableMultimap.Builder<String, PreemptionVictim> tasksBySlave = ImmutableMultimap.builder();
+    for (IAssignedTask task : activeTasks) {
+      tasksBySlave.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
+    }
+    return tasksBySlave.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
new file mode 100644
index 0000000..024a689
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.Objects;
+
+import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * A victim to be considered as a candidate for preemption.
+ */
+public final class PreemptionVictim {
+  private final String slaveHost;
+  private final boolean production;
+  private final String role;
+  private final int priority;
+  private final ResourceSlot resources;
+  private final String taskId;
+
+  private PreemptionVictim(
+      String slaveHost,
+      boolean production,
+      String role,
+      int priority,
+      ResourceSlot resources,
+      String taskId) {
+
+    this.slaveHost = slaveHost;
+    this.production = production;
+    this.role = role;
+    this.priority = priority;
+    this.resources = resources;
+    this.taskId = taskId;
+  }
+
+  public static PreemptionVictim fromTask(IAssignedTask task) {
+    ITaskConfig config = task.getTask();
+    return new PreemptionVictim(
+        task.getSlaveHost(),
+        config.isProduction(),
+        config.getOwner().getRole(),
+        config.getPriority(),
+        ResourceSlot.from(config),
+        task.getTaskId());
+  }
+
+  public String getSlaveHost() {
+    return slaveHost;
+  }
+
+  public boolean isProduction() {
+    return production;
+  }
+
+  public String getRole() {
+    return role;
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+
+  public ResourceSlot getResources() {
+    return resources;
+  }
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof PreemptionVictim)) {
+      return false;
+    }
+
+    PreemptionVictim other = (PreemptionVictim) o;
+    return Objects.equals(getSlaveHost(), other.getSlaveHost())
+        && Objects.equals(isProduction(), other.isProduction())
+        && Objects.equals(getRole(), other.getRole())
+        && Objects.equals(getPriority(), other.getPriority())
+        && Objects.equals(getResources(), other.getResources())
+        && Objects.equals(getTaskId(), other.getTaskId());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(slaveHost, production, role, priority, resources, taskId);
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("slaveHost", getSlaveHost())
+        .add("production", isProduction())
+        .add("role", getRole())
+        .add("priority", getPriority())
+        .add("resources", getResources())
+        .add("taskId", getTaskId())
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
index f013383..0204d14 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
@@ -25,7 +25,6 @@ import javax.inject.Qualifier;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -138,14 +137,6 @@ class PreemptorImpl implements Preemptor {
     this.clusterState = requireNonNull(clusterState);
   }
 
-  private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
-      new Function<IAssignedTask, ResourceSlot>() {
-        @Override
-        public ResourceSlot apply(IAssignedTask task) {
-          return ResourceSlot.from(task.getTask());
-        }
-      };
-
   private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
       new Function<HostOffer, ResourceSlot>() {
         @Override
@@ -170,10 +161,34 @@ class PreemptorImpl implements Preemptor {
         }
       };
 
+  private static final Function<PreemptionVictim, ResourceSlot> VICTIM_TO_RESOURCES =
+      new Function<PreemptionVictim, ResourceSlot>() {
+        @Override
+        public ResourceSlot apply(PreemptionVictim victim) {
+          return victim.getResources();
+        }
+      };
+
+  private static final Function<PreemptionVictim, String> VICTIM_TO_TASK_ID =
+      new Function<PreemptionVictim, String>() {
+        @Override
+        public String apply(PreemptionVictim victim) {
+          return victim.getTaskId();
+        }
+      };
+
   // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
   // ordering
-  private static final Ordering<IAssignedTask> RESOURCE_ORDER =
-      ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
+  private static final Ordering<PreemptionVictim> RESOURCE_ORDER =
+      ResourceSlot.ORDER.onResultOf(VICTIM_TO_RESOURCES).reverse();
+
+  private static final Function<PreemptionVictim, String> VICTIM_TO_HOST =
+      new Function<PreemptionVictim, String>() {
+        @Override
+        public String apply(PreemptionVictim victim) {
+          return victim.getSlaveHost();
+        }
+      };
 
   /**
    * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
@@ -181,7 +196,7 @@ class PreemptorImpl implements Preemptor {
    * A set with elements indicates those tasks and the offers are enough.
    */
   private Optional<Set<String>> getTasksToPreempt(
-      Iterable<IAssignedTask> possibleVictims,
+      Iterable<PreemptionVictim> possibleVictims,
       Iterable<HostOffer> offers,
       IAssignedTask pendingTask,
       AttributeAggregate jobState) {
@@ -189,7 +204,7 @@ class PreemptorImpl implements Preemptor {
     // This enforces the precondition that all of the resources are from the same host. We need to
     // get the host for the schedulingFilter.
     Set<String> hosts = ImmutableSet.<String>builder()
-        .addAll(Iterables.transform(possibleVictims, Tasks.ASSIGNED_TO_SLAVE_HOST))
+        .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST))
         .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
 
     String host = Iterables.getOnlyElement(hosts);
@@ -216,24 +231,22 @@ class PreemptorImpl implements Preemptor {
       }
     }
 
-    FluentIterable<IAssignedTask> preemptableTasks = FluentIterable.from(possibleVictims)
-        .filter(Predicates.compose(
-            preemptionFilter(pendingTask.getTask()),
-            Tasks.ASSIGNED_TO_INFO));
+    FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims)
+        .filter(preemptionFilter(pendingTask.getTask()));
 
     if (preemptableTasks.isEmpty()) {
       return Optional.absent();
     }
 
-    List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
+    List<PreemptionVictim> toPreemptTasks = Lists.newArrayList();
 
-    Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
+    Iterable<PreemptionVictim> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
 
-    for (IAssignedTask victim : sortedVictims) {
+    for (PreemptionVictim victim : sortedVictims) {
       toPreemptTasks.add(victim);
 
       ResourceSlot totalResource = ResourceSlot.sum(
-          ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
+          ResourceSlot.sum(Iterables.transform(toPreemptTasks, VICTIM_TO_RESOURCES)),
           slackResources);
 
       Optional<IHostAttributes> attributes = getHostAttributes(host);
@@ -248,7 +261,7 @@ class PreemptorImpl implements Preemptor {
 
       if (vetoes.isEmpty()) {
         Set<String> taskIds =
-            FluentIterable.from(toPreemptTasks).transform(Tasks.ASSIGNED_TO_ID).toSet();
+            FluentIterable.from(toPreemptTasks).transform(VICTIM_TO_TASK_ID).toSet();
         return Optional.of(taskIds);
       }
     }
@@ -293,7 +306,7 @@ class PreemptorImpl implements Preemptor {
       return Optional.absent();
     }
 
-    Multimap<String, IAssignedTask> slavesToActiveTasks = clusterState.getSlavesToActiveTasks();
+    Multimap<String, PreemptionVictim> slavesToActiveTasks = clusterState.getSlavesToActiveTasks();
 
     if (slavesToActiveTasks.isEmpty()) {
       return Optional.absent();
@@ -351,10 +364,10 @@ class PreemptorImpl implements Preemptor {
    * @return A filter that will compare the priorities and resources required by other tasks
    *     with {@code preemptableTask}.
    */
-  private static Predicate<ITaskConfig> preemptionFilter(final ITaskConfig pendingTask) {
-    return new Predicate<ITaskConfig>() {
+  private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) {
+    return new Predicate<PreemptionVictim>() {
       @Override
-      public boolean apply(ITaskConfig possibleVictim) {
+      public boolean apply(PreemptionVictim possibleVictim) {
         boolean pendingIsProduction = pendingTask.isProduction();
         boolean victimIsProduction = possibleVictim.isProduction();
 
@@ -362,7 +375,7 @@ class PreemptorImpl implements Preemptor {
           return true;
         } else if (pendingIsProduction == victimIsProduction) {
           // If production flags are equal, preemption is based on priority within the same role.
-          if (pendingTask.getJob().getRole().equals(possibleVictim.getJob().getRole())) {
+          if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) {
             return pendingTask.getPriority() > possibleVictim.getPriority();
           } else {
             return false;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
index 8f91ff6..763f8b5 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
@@ -17,8 +17,9 @@ import com.google.common.collect.ImmutableMultimap;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.junit.Before;
@@ -45,7 +46,7 @@ public class LiveClusterStateTest extends EasyMockTest {
     control.replay();
 
     assertEquals(
-        ImmutableMultimap.<String, IAssignedTask>of(),
+        ImmutableMultimap.<String, PreemptionVictim>of(),
         clusterState.getSlavesToActiveTasks());
   }
 
@@ -53,7 +54,17 @@ public class LiveClusterStateTest extends EasyMockTest {
     return IScheduledTask.build(new ScheduledTask()
         .setAssignedTask(new AssignedTask()
             .setTaskId(taskId)
-            .setSlaveId(slaveId)));
+            .setSlaveId(slaveId)
+            .setSlaveHost(slaveId + "-host")
+            .setTask(new TaskConfig()
+                .setOwner(new Identity("owner", "role"))
+                .setNumCpus(1)
+                .setRamMb(1)
+                .setDiskMb(1))));
+  }
+
+  private static PreemptionVictim fromTask(IScheduledTask task) {
+    return PreemptionVictim.fromTask(task.getAssignedTask());
   }
 
   @Test
@@ -67,9 +78,9 @@ public class LiveClusterStateTest extends EasyMockTest {
     control.replay();
 
     assertEquals(
-        ImmutableMultimap.<String, IAssignedTask>builder()
-            .putAll("1", a.getAssignedTask(), b.getAssignedTask())
-            .putAll("2", c.getAssignedTask())
+        ImmutableMultimap.<String, PreemptionVictim>builder()
+            .putAll("1", fromTask(a), fromTask(b))
+            .putAll("2", fromTask(c))
             .build(),
         clusterState.getSlavesToActiveTasks());
   }