You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/12/02 20:51:17 UTC
incubator-aurora git commit: Minimize the state consumed when
collecting preemption victims.
Repository: incubator-aurora
Updated Branches:
refs/heads/master cf61e8a39 -> c1b078d56
Minimize the state consumed when collecting preemption victims.
Bugs closed: AURORA-121
Reviewed at https://reviews.apache.org/r/28572/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/c1b078d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/c1b078d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/c1b078d5
Branch: refs/heads/master
Commit: c1b078d56fba7690b5212416fce40c156014cafc
Parents: cf61e8a
Author: Bill Farner <wf...@apache.org>
Authored: Tue Dec 2 11:50:49 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Dec 2 11:50:49 2014 -0800
----------------------------------------------------------------------
.../scheduler/async/preemptor/ClusterState.java | 4 +-
.../async/preemptor/LiveClusterState.java | 21 ++--
.../async/preemptor/PreemptionVictim.java | 115 +++++++++++++++++++
.../async/preemptor/PreemptorImpl.java | 67 ++++++-----
.../async/preemptor/LiveClusterStateTest.java | 23 +++-
5 files changed, 182 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
index 4f0019a..3524dc5 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
@@ -15,8 +15,6 @@ package org.apache.aurora.scheduler.async.preemptor;
import com.google.common.collect.Multimap;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-
/**
* A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
*/
@@ -30,5 +28,5 @@ public interface ClusterState {
*
* @return Active tasks and their associated slave IDs.
*/
- Multimap<String, IAssignedTask> getSlavesToActiveTasks();
+ Multimap<String, PreemptionVictim> getSlavesToActiveTasks();
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
index 9d83acc..e6bd1b5 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterState.java
@@ -18,10 +18,9 @@ import java.util.EnumSet;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import org.apache.aurora.scheduler.base.Query;
@@ -35,14 +34,6 @@ import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
class LiveClusterState implements ClusterState {
- @VisibleForTesting
- static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
- new Function<IAssignedTask, String>() {
- @Override
- public String apply(IAssignedTask input) {
- return input.getSlaveId();
- }
- };
@VisibleForTesting
static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
@@ -56,13 +47,19 @@ class LiveClusterState implements ClusterState {
}
@Override
- public Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
+ public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
// Only non-pending active tasks may be preempted.
Iterable<IAssignedTask> activeTasks = Iterables.transform(
Storage.Util.fetchTasks(storage, CANDIDATE_QUERY),
SCHEDULED_TO_ASSIGNED);
// Group the tasks by slave id so they can be paired with offers from the same slave.
- return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
+ // Choosing to do this iteratively instead of using Multimaps.index/transform to avoid
+ // generating a very large intermediate map.
+ ImmutableMultimap.Builder<String, PreemptionVictim> tasksBySlave = ImmutableMultimap.builder();
+ for (IAssignedTask task : activeTasks) {
+ tasksBySlave.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
+ }
+ return tasksBySlave.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
new file mode 100644
index 0000000..024a689
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.Objects;
+
+import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * A victim to be considered as a candidate for preemption.
+ */
+public final class PreemptionVictim {
+ private final String slaveHost;
+ private final boolean production;
+ private final String role;
+ private final int priority;
+ private final ResourceSlot resources;
+ private final String taskId;
+
+ private PreemptionVictim(
+ String slaveHost,
+ boolean production,
+ String role,
+ int priority,
+ ResourceSlot resources,
+ String taskId) {
+
+ this.slaveHost = slaveHost;
+ this.production = production;
+ this.role = role;
+ this.priority = priority;
+ this.resources = resources;
+ this.taskId = taskId;
+ }
+
+ public static PreemptionVictim fromTask(IAssignedTask task) {
+ ITaskConfig config = task.getTask();
+ return new PreemptionVictim(
+ task.getSlaveHost(),
+ config.isProduction(),
+ config.getOwner().getRole(),
+ config.getPriority(),
+ ResourceSlot.from(config),
+ task.getTaskId());
+ }
+
+ public String getSlaveHost() {
+ return slaveHost;
+ }
+
+ public boolean isProduction() {
+ return production;
+ }
+
+ public String getRole() {
+ return role;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public ResourceSlot getResources() {
+ return resources;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PreemptionVictim)) {
+ return false;
+ }
+
+ PreemptionVictim other = (PreemptionVictim) o;
+ return Objects.equals(getSlaveHost(), other.getSlaveHost())
+ && Objects.equals(isProduction(), other.isProduction())
+ && Objects.equals(getRole(), other.getRole())
+ && Objects.equals(getPriority(), other.getPriority())
+ && Objects.equals(getResources(), other.getResources())
+ && Objects.equals(getTaskId(), other.getTaskId());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(slaveHost, production, role, priority, resources, taskId);
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.Objects.toStringHelper(this)
+ .add("slaveHost", getSlaveHost())
+ .add("production", isProduction())
+ .add("role", getRole())
+ .add("priority", getPriority())
+ .add("resources", getResources())
+ .add("taskId", getTaskId())
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
index f013383..0204d14 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
@@ -25,7 +25,6 @@ import javax.inject.Qualifier;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -138,14 +137,6 @@ class PreemptorImpl implements Preemptor {
this.clusterState = requireNonNull(clusterState);
}
- private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
- new Function<IAssignedTask, ResourceSlot>() {
- @Override
- public ResourceSlot apply(IAssignedTask task) {
- return ResourceSlot.from(task.getTask());
- }
- };
-
private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
new Function<HostOffer, ResourceSlot>() {
@Override
@@ -170,10 +161,34 @@ class PreemptorImpl implements Preemptor {
}
};
+ private static final Function<PreemptionVictim, ResourceSlot> VICTIM_TO_RESOURCES =
+ new Function<PreemptionVictim, ResourceSlot>() {
+ @Override
+ public ResourceSlot apply(PreemptionVictim victim) {
+ return victim.getResources();
+ }
+ };
+
+ private static final Function<PreemptionVictim, String> VICTIM_TO_TASK_ID =
+ new Function<PreemptionVictim, String>() {
+ @Override
+ public String apply(PreemptionVictim victim) {
+ return victim.getTaskId();
+ }
+ };
+
// TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
// ordering
- private static final Ordering<IAssignedTask> RESOURCE_ORDER =
- ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
+ private static final Ordering<PreemptionVictim> RESOURCE_ORDER =
+ ResourceSlot.ORDER.onResultOf(VICTIM_TO_RESOURCES).reverse();
+
+ private static final Function<PreemptionVictim, String> VICTIM_TO_HOST =
+ new Function<PreemptionVictim, String>() {
+ @Override
+ public String apply(PreemptionVictim victim) {
+ return victim.getSlaveHost();
+ }
+ };
/**
* Optional.absent indicates that this slave does not have enough resources to satisfy the task.
@@ -181,7 +196,7 @@ class PreemptorImpl implements Preemptor {
* A set with elements indicates those tasks and the offers are enough.
*/
private Optional<Set<String>> getTasksToPreempt(
- Iterable<IAssignedTask> possibleVictims,
+ Iterable<PreemptionVictim> possibleVictims,
Iterable<HostOffer> offers,
IAssignedTask pendingTask,
AttributeAggregate jobState) {
@@ -189,7 +204,7 @@ class PreemptorImpl implements Preemptor {
// This enforces the precondition that all of the resources are from the same host. We need to
// get the host for the schedulingFilter.
Set<String> hosts = ImmutableSet.<String>builder()
- .addAll(Iterables.transform(possibleVictims, Tasks.ASSIGNED_TO_SLAVE_HOST))
+ .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST))
.addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
String host = Iterables.getOnlyElement(hosts);
@@ -216,24 +231,22 @@ class PreemptorImpl implements Preemptor {
}
}
- FluentIterable<IAssignedTask> preemptableTasks = FluentIterable.from(possibleVictims)
- .filter(Predicates.compose(
- preemptionFilter(pendingTask.getTask()),
- Tasks.ASSIGNED_TO_INFO));
+ FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims)
+ .filter(preemptionFilter(pendingTask.getTask()));
if (preemptableTasks.isEmpty()) {
return Optional.absent();
}
- List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
+ List<PreemptionVictim> toPreemptTasks = Lists.newArrayList();
- Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
+ Iterable<PreemptionVictim> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
- for (IAssignedTask victim : sortedVictims) {
+ for (PreemptionVictim victim : sortedVictims) {
toPreemptTasks.add(victim);
ResourceSlot totalResource = ResourceSlot.sum(
- ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
+ ResourceSlot.sum(Iterables.transform(toPreemptTasks, VICTIM_TO_RESOURCES)),
slackResources);
Optional<IHostAttributes> attributes = getHostAttributes(host);
@@ -248,7 +261,7 @@ class PreemptorImpl implements Preemptor {
if (vetoes.isEmpty()) {
Set<String> taskIds =
- FluentIterable.from(toPreemptTasks).transform(Tasks.ASSIGNED_TO_ID).toSet();
+ FluentIterable.from(toPreemptTasks).transform(VICTIM_TO_TASK_ID).toSet();
return Optional.of(taskIds);
}
}
@@ -293,7 +306,7 @@ class PreemptorImpl implements Preemptor {
return Optional.absent();
}
- Multimap<String, IAssignedTask> slavesToActiveTasks = clusterState.getSlavesToActiveTasks();
+ Multimap<String, PreemptionVictim> slavesToActiveTasks = clusterState.getSlavesToActiveTasks();
if (slavesToActiveTasks.isEmpty()) {
return Optional.absent();
@@ -351,10 +364,10 @@ class PreemptorImpl implements Preemptor {
* @return A filter that will compare the priorities and resources required by other tasks
* with {@code preemptableTask}.
*/
- private static Predicate<ITaskConfig> preemptionFilter(final ITaskConfig pendingTask) {
- return new Predicate<ITaskConfig>() {
+ private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) {
+ return new Predicate<PreemptionVictim>() {
@Override
- public boolean apply(ITaskConfig possibleVictim) {
+ public boolean apply(PreemptionVictim possibleVictim) {
boolean pendingIsProduction = pendingTask.isProduction();
boolean victimIsProduction = possibleVictim.isProduction();
@@ -362,7 +375,7 @@ class PreemptorImpl implements Preemptor {
return true;
} else if (pendingIsProduction == victimIsProduction) {
// If production flags are equal, preemption is based on priority within the same role.
- if (pendingTask.getJob().getRole().equals(possibleVictim.getJob().getRole())) {
+ if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) {
return pendingTask.getPriority() > possibleVictim.getPriority();
} else {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c1b078d5/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
index 8f91ff6..763f8b5 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/LiveClusterStateTest.java
@@ -17,8 +17,9 @@ import com.google.common.collect.ImmutableMultimap;
import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.junit.Before;
@@ -45,7 +46,7 @@ public class LiveClusterStateTest extends EasyMockTest {
control.replay();
assertEquals(
- ImmutableMultimap.<String, IAssignedTask>of(),
+ ImmutableMultimap.<String, PreemptionVictim>of(),
clusterState.getSlavesToActiveTasks());
}
@@ -53,7 +54,17 @@ public class LiveClusterStateTest extends EasyMockTest {
return IScheduledTask.build(new ScheduledTask()
.setAssignedTask(new AssignedTask()
.setTaskId(taskId)
- .setSlaveId(slaveId)));
+ .setSlaveId(slaveId)
+ .setSlaveHost(slaveId + "-host")
+ .setTask(new TaskConfig()
+ .setOwner(new Identity("owner", "role"))
+ .setNumCpus(1)
+ .setRamMb(1)
+ .setDiskMb(1))));
+ }
+
+ private static PreemptionVictim fromTask(IScheduledTask task) {
+ return PreemptionVictim.fromTask(task.getAssignedTask());
}
@Test
@@ -67,9 +78,9 @@ public class LiveClusterStateTest extends EasyMockTest {
control.replay();
assertEquals(
- ImmutableMultimap.<String, IAssignedTask>builder()
- .putAll("1", a.getAssignedTask(), b.getAssignedTask())
- .putAll("2", c.getAssignedTask())
+ ImmutableMultimap.<String, PreemptionVictim>builder()
+ .putAll("1", fromTask(a), fromTask(b))
+ .putAll("2", fromTask(c))
.build(),
clusterState.getSlavesToActiveTasks());
}