You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/01/14 19:48:03 UTC
aurora git commit: Accept resource offers from multiple framework
roles.
Repository: aurora
Updated Branches:
refs/heads/master 952ef6db3 -> a80260eaf
Accept resource offers from multiple framework roles.
Bugs closed: AURORA-1109
Reviewed at https://reviews.apache.org/r/42126/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a80260ea
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a80260ea
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a80260ea
Branch: refs/heads/master
Commit: a80260eafcd652e75706f9ad5a5a1886c7b051ef
Parents: 952ef6d
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu Jan 14 10:43:32 2016 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Jan 14 10:43:32 2016 -0800
----------------------------------------------------------------------
NEWS | 2 +
.../apache/aurora/scheduler/AcceptedOffer.java | 234 ++++++++++++++
.../apache/aurora/scheduler/ResourceSlot.java | 50 ++-
.../org/apache/aurora/scheduler/Resources.java | 37 +--
.../mesos/CommandLineDriverSettingsModule.java | 19 +-
.../scheduler/mesos/MesosTaskFactory.java | 58 ++--
.../aurora/scheduler/state/TaskAssigner.java | 2 +-
.../aurora/scheduler/AcceptedOfferTest.java | 303 +++++++++++++++++++
.../aurora/scheduler/ResourceSlotTest.java | 14 +-
.../CommandLineDriverSettingsModuleTest.java | 23 +-
.../mesos/MesosTaskFactoryImplTest.java | 61 +++-
.../scheduler/state/TaskAssignerImplTest.java | 8 +-
12 files changed, 714 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/NEWS
----------------------------------------------------------------------
diff --git a/NEWS b/NEWS
index acaff9e..809077f 100644
--- a/NEWS
+++ b/NEWS
@@ -21,6 +21,8 @@
at http://logback.qos.ch/manual/configuration.html
With this change, we have removed the following scheduler command line arguments as they were
made redundant: `logtostderr`, `alsologtostderr`, `vlog`, `vmodule`, and `use_glog_formatter`.
+- Added support for configuring Mesos role by passing `-mesos_role` to Aurora scheduler at start time.
+ This enables resource reservation for Aurora when running in a shared Mesos cluster.
0.11.0
------
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java b/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java
new file mode 100644
index 0000000..9c2dc0b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.scheduler.base.Numbers;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Resource;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Allocate resources from an accepted Mesos Offer to TaskInfo and ExecutorInfo.
+ */
+public final class AcceptedOffer {
+
+ public static final String DEFAULT_ROLE_NAME = "*";
+
+ /**
+ * Reserved resource filter.
+ */
+ public static final Predicate<Resource> RESERVED =
+ e -> e.hasRole() && !e.getRole().equals(DEFAULT_ROLE_NAME);
+
+ /**
+ * Non reserved resource filter.
+ */
+ public static final Predicate<Resource> NOT_RESERVED = Predicates.not(RESERVED);
+
+ /**
+ * Helper function to check a resource value is small enough to be considered zero.
+ */
+ public static boolean nearZero(double value) {
+ return Math.abs(value) < EPSILON;
+ }
+
+ /**
+ * Get proper value for {@link org.apache.mesos.Protos.TaskInfo}'s resources.
+ * @return A list of Resource used for TaskInfo.
+ */
+ public List<Resource> getTaskResources() {
+ return taskResources;
+ }
+
+ /**
+ * Get proper value for {@link org.apache.mesos.Protos.ExecutorInfo}'s resources.
+ * @return A list of Resource used for ExecutorInfo.
+ */
+ public List<Resource> getExecutorResources() {
+ return executorResources;
+ }
+
+ /**
+ * Use this epsilon value to avoid comparison with zero.
+ */
+ private static final double EPSILON = 1e-6;
+
+ private final List<Resource> taskResources;
+ private final List<Resource> executorResources;
+
+ public static AcceptedOffer create(
+ Offer offer,
+ ResourceSlot taskSlot,
+ ResourceSlot executorSlot,
+ Set<Integer> selectedPorts,
+ TierInfo tierInfo) throws Resources.InsufficientResourcesException {
+
+ List<Resource> reservedFirst = ImmutableList.<Resource>builder()
+ .addAll(Iterables.filter(offer.getResourcesList(), RESERVED))
+ .addAll(Iterables.filter(offer.getResourcesList(), NOT_RESERVED))
+ .build();
+
+ boolean revocable = tierInfo.isRevocable();
+ List<Resource.Builder> cpuResources = filterToBuilders(
+ reservedFirst,
+ ResourceType.CPUS.getName(),
+ revocable ? Resources.REVOCABLE : Resources.NON_REVOCABLE);
+ List<Resource.Builder> memResources = filterToBuilderNonRevocable(
+ reservedFirst, ResourceType.RAM_MB.getName());
+ List<Resource.Builder> diskResources = filterToBuilderNonRevocable(
+ reservedFirst, ResourceType.DISK_MB.getName());
+ List<Resource.Builder> portsResources = filterToBuilderNonRevocable(
+ reservedFirst, ResourceType.PORTS.getName());
+
+ List<Resource> taskResources = ImmutableList.<Resource>builder()
+ .addAll(allocateScalarType(cpuResources, taskSlot.getNumCpus(), revocable))
+ .addAll(allocateScalarType(memResources, taskSlot.getRam().as(Data.MB), false))
+ .addAll(allocateScalarType(diskResources, taskSlot.getDisk().as(Data.MB), false))
+ .addAll(allocateRangeType(portsResources, selectedPorts))
+ .build();
+
+ List<Resource> executorResources = ImmutableList.<Resource>builder()
+ .addAll(allocateScalarType(cpuResources, executorSlot.getNumCpus(), revocable))
+ .addAll(allocateScalarType(memResources, executorSlot.getRam().as(Data.MB), false))
+ .addAll(allocateScalarType(diskResources, executorSlot.getDisk().as(Data.MB), false))
+ .build();
+
+ return new AcceptedOffer(taskResources, executorResources);
+ }
+
+ private AcceptedOffer(
+ List<Resource> taskResources,
+ List<Resource> executorResources) {
+
+ this.taskResources = requireNonNull(taskResources);
+ this.executorResources = requireNonNull(executorResources);
+ }
+
+ private static List<Resource> allocateRangeType(
+ List<Resource.Builder> from,
+ Set<Integer> valueSet) throws Resources.InsufficientResourcesException {
+
+ Set<Integer> leftOver = Sets.newHashSet(valueSet);
+ ImmutableList.Builder<Resource> result = ImmutableList.<Resource>builder();
+ for (Resource.Builder r : from) {
+ Set<Integer> fromResource = Sets.newHashSet(Iterables.concat(
+ Iterables.transform(r.getRanges().getRangeList(), Resources.RANGE_TO_MEMBERS)));
+ Set<Integer> available = Sets.newHashSet(Sets.intersection(leftOver, fromResource));
+ if (available.isEmpty()) {
+ continue;
+ }
+ Resource newResource = makeMesosRangeResource(r.build(), available);
+ result.add(newResource);
+ leftOver.removeAll(available);
+ if (leftOver.isEmpty()) {
+ break;
+ }
+ }
+ if (!leftOver.isEmpty()) {
+ // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
+ // consistent.
+ // Maybe we should consider implementing resource veto with this class to ensure that.
+ throw new Resources.InsufficientResourcesException(
+ "Insufficient resource for range type when allocating from offer");
+ }
+ return result.build();
+ }
+
+ /**
+ * Creates a mesos resource of integer ranges from given prototype.
+ *
+ * @param prototype Resource prototype.
+ * @param values Values to translate into ranges.
+ * @return A new mesos ranges resource.
+ */
+ static Resource makeMesosRangeResource(
+ Resource prototype,
+ Set<Integer> values) {
+
+ return Protos.Resource.newBuilder(prototype)
+ .setRanges(Protos.Value.Ranges.newBuilder()
+ .addAllRange(
+ Iterables.transform(Numbers.toRanges(values), ResourceSlot.RANGE_TRANSFORM)))
+ .build();
+ }
+
+ private static List<Resource> allocateScalarType(
+ List<Resource.Builder> from,
+ double amount,
+ boolean revocable) throws Resources.InsufficientResourcesException {
+
+ double remaining = amount;
+ ImmutableList.Builder<Resource> result = ImmutableList.builder();
+ for (Resource.Builder r : from) {
+ if (nearZero(remaining)) {
+ break;
+ }
+ final double available = r.getScalar().getValue();
+ if (nearZero(available)) {
+ // Skip resource slot that is already used up.
+ continue;
+ }
+ final double used = Math.min(remaining, available);
+ remaining -= used;
+ Resource.Builder newResource =
+ Resource.newBuilder(r.build())
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(used).build());
+ if (revocable) {
+ newResource.setRevocable(Resource.RevocableInfo.newBuilder());
+ }
+ result.add(newResource.build());
+ r.getScalarBuilder().setValue(available - used);
+ }
+ if (!nearZero(remaining)) {
+ // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
+ // consistent.
+ // Maybe we should consider implementing resource veto with this class to ensure that.
+ throw new Resources.InsufficientResourcesException(
+ "Insufficient resource when allocating from offer");
+ }
+ return result.build();
+ }
+
+ private static List<Resource.Builder> filterToBuilders(
+ List<Resource> resources,
+ String name,
+ Predicate<Resource> additionalFilter) {
+
+ return FluentIterable.from(resources)
+ .filter(e -> e.getName().equals(name))
+ .filter(additionalFilter)
+ .transform(Resource::toBuilder)
+ .toList();
+ }
+
+ private static List<Resource.Builder> filterToBuilderNonRevocable(
+ List<Resource> resources,
+ String name) {
+
+ return filterToBuilders(resources, name, Resources.NON_REVOCABLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
index 7c3d681..86f2667 100644
--- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
@@ -23,7 +23,6 @@ import java.util.function.Predicate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
@@ -43,7 +42,6 @@ import static java.util.Objects.requireNonNull;
import static org.apache.aurora.common.quantity.Data.BYTES;
import static org.apache.aurora.scheduler.ResourceType.CPUS;
import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
-import static org.apache.aurora.scheduler.ResourceType.PORTS;
import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
/**
@@ -62,6 +60,15 @@ public final class ResourceSlot {
public static final ResourceSlot NONE =
new ResourceSlot(0, Amount.of(0L, Data.BITS), Amount.of(0L, Data.BITS), 0);
+ /**
+ * Convert {@link com.google.common.collect.Range} to {@link org.apache.mesos.Protos.Value.Range}.
+ */
+ public static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM =
+ input -> Protos.Value.Range.newBuilder()
+ .setBegin(input.lowerEndpoint())
+ .setEnd(input.upperEndpoint())
+ .build();
+
public ResourceSlot(
double numCpus,
Amount<Long, Data> ram,
@@ -90,26 +97,6 @@ public final class ResourceSlot {
}
/**
- * Adapts this slot object to a list of Mesos resources.
- *
- * @param selectedPorts The ports selected, to be applied as concrete task ranges.
- * @param tierInfo Task tier info.
- * @return Mesos resources.
- */
- public List<Protos.Resource> toResourceList(Set<Integer> selectedPorts, TierInfo tierInfo) {
- ImmutableList.Builder<Protos.Resource> resourceBuilder =
- ImmutableList.<Protos.Resource>builder()
- .add(makeMesosResource(CPUS, numCpus, tierInfo.isRevocable()))
- .add(makeMesosResource(DISK_MB, disk.as(Data.MB), false))
- .add(makeMesosResource(RAM_MB, ram.as(Data.MB), false));
- if (!selectedPorts.isEmpty()) {
- resourceBuilder.add(makeMesosRangeResource(PORTS, selectedPorts));
- }
-
- return resourceBuilder.build();
- }
-
- /**
* Ensures that the revocable setting on the executor and task CPU resources match.
*
* @param task Task to check for resource type alignment.
@@ -142,23 +129,26 @@ public final class ResourceSlot {
/**
* Convenience method for adapting to Mesos resources without applying a port range.
*
- * @see {@link #toResourceList(java.util.Set, TierInfo)}
* @param tierInfo Task tier info.
* @return Mesos resources.
*/
public List<Protos.Resource> toResourceList(TierInfo tierInfo) {
- return toResourceList(ImmutableSet.of(), tierInfo);
+ return ImmutableList.<Protos.Resource>builder()
+ .add(makeMesosResource(CPUS, numCpus, tierInfo.isRevocable()))
+ .add(makeMesosResource(DISK_MB, disk.as(Data.MB), false))
+ .add(makeMesosResource(RAM_MB, ram.as(Data.MB), false))
+ .build();
}
/**
* Creates a mesos resource of integer ranges.
*
* @param resourceType Resource type.
- * @param values Values to translate into ranges.
- * @return A mesos ranges resource.
+ * @param values Values to translate into ranges.
+ * @return A new mesos ranges resource.
*/
@VisibleForTesting
- static Protos.Resource makeMesosRangeResource(
+ public static Protos.Resource makeMesosRangeResource(
ResourceType resourceType,
Set<Integer> values) {
@@ -348,10 +338,4 @@ public final class ResourceSlot {
};
private static final Predicate<Integer> IS_ZERO = e -> e == 0;
-
- private static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM =
- input -> Protos.Value.Range.newBuilder()
- .setBegin(input.lowerEndpoint())
- .setEnd(input.upperEndpoint())
- .build();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java
index db422a9..4baf9dd 100644
--- a/src/main/java/org/apache/aurora/scheduler/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/Resources.java
@@ -62,6 +62,14 @@ public final class Resources {
*/
public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable);
+ /**
+ * Convert range to set of integers.
+ */
+ public static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
+ range -> ContiguousSet.create(
+ com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()),
+ DiscreteDomain.integers());
+
private final Iterable<Resource> mesosResources;
private Resources(Iterable<Resource> mesosResources) {
@@ -145,38 +153,33 @@ public final class Resources {
}
private double getScalarValue(String key) {
- Resource resource = getResource(key);
- if (resource == null) {
- return 0;
+ Iterable<Resource> resources = getResources(key);
+ double value = 0;
+ for (Resource r : resources) {
+ value += r.getScalar().getValue();
}
-
- return resource.getScalar().getValue();
+ return value;
}
- private Resource getResource(String key) {
- return Iterables.find(mesosResources, e -> e.getName().equals(key), null);
+ private Iterable<Resource> getResources(String key) {
+ return Iterables.filter(mesosResources, e -> e.getName().equals(key));
}
private Iterable<Range> getPortRanges() {
- Resource resource = getResource(PORTS.getName());
- if (resource == null) {
- return ImmutableList.of();
+ ImmutableList.Builder<Range> ranges = ImmutableList.builder();
+ for (Resource r : getResources(PORTS.getName())) {
+ ranges.addAll(r.getRanges().getRangeList().iterator());
}
- return resource.getRanges().getRangeList();
+ return ranges.build();
}
/**
* Thrown when there are insufficient resources to satisfy a request.
*/
- static class InsufficientResourcesException extends RuntimeException {
+ public static class InsufficientResourcesException extends RuntimeException {
InsufficientResourcesException(String message) {
super(message);
}
}
-
- private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
- range -> ContiguousSet.create(
- com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()),
- DiscreteDomain.integers());
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
index 2255dd4..7de8f4c 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
@@ -89,6 +89,12 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
help = "Allows receiving revocable resource offers from Mesos.")
private static final Arg<Boolean> RECEIVE_REVOCABLE_RESOURCES = Arg.create(false);
+ @CmdLine(name = "mesos_role",
+ help = "The Mesos role this framework will register as. The default is to left this empty, "
+ + "and the framework will register without any role and only receive unreserved "
+ + "resources in offer.")
+ private static final Arg<String> MESOS_ROLE = Arg.create();
+
// TODO(wfarner): Figure out a way to change this without risk of fallout (MESOS-703).
private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
@@ -99,6 +105,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
if (FRAMEWORK_ANNOUNCE_PRINCIPAL.get() && credentials.isPresent()) {
principal = Optional.of(credentials.get().getPrincipal());
}
+ Optional<String> role =
+ MESOS_ROLE.hasAppliedValue() ? Optional.of(MESOS_ROLE.get()) : Optional.absent();
DriverSettings settings = new DriverSettings(
MESOS_MASTER_ADDRESS.get(),
credentials,
@@ -106,7 +114,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
EXECUTOR_USER.get(),
principal,
FRAMEWORK_FAILOVER_TIMEOUT.get(),
- RECEIVE_REVOCABLE_RESOURCES.get()));
+ RECEIVE_REVOCABLE_RESOURCES.get(),
+ role));
bind(DriverSettings.class).toInstance(settings);
}
@@ -138,7 +147,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
String executorUser,
Optional<String> principal,
Amount<Long, Time> failoverTimeout,
- boolean revocable) {
+ boolean revocable,
+ Optional<String> role) {
FrameworkInfo.Builder infoBuilder = FrameworkInfo.newBuilder()
.setUser(executorUser)
@@ -153,6 +163,11 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
if (revocable) {
infoBuilder.addCapabilities(Capability.newBuilder().setType(REVOCABLE_RESOURCES));
}
+
+ if (role.isPresent()) {
+ infoBuilder.setRole(role.get());
+ }
+
return infoBuilder.build();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index 8fdadda..fcad0e7 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -24,7 +24,9 @@ import com.google.protobuf.ByteString;
import org.apache.aurora.Protobufs;
import org.apache.aurora.codec.ThriftBinaryCodec;
+import org.apache.aurora.scheduler.AcceptedOffer;
import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.Resources;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.SchedulerException;
@@ -38,6 +40,7 @@ import org.apache.mesos.Protos;
import org.apache.mesos.Protos.ContainerInfo;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskID;
@@ -56,11 +59,11 @@ public interface MesosTaskFactory {
* Creates a mesos task object.
*
* @param task Assigned task to translate into a task object.
- * @param slaveId Id of the slave the task is being assigned to.
+ * @param offer Resource offer the task is being assigned to.
* @return A new task.
* @throws SchedulerException If the task could not be encoded.
*/
- TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
+ TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException;
// TODO(wfarner): Move this class to its own file to reduce visibility to package private.
class MesosTaskFactoryImpl implements MesosTaskFactory {
@@ -95,9 +98,11 @@ public interface MesosTaskFactory {
}
@Override
- public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
+ public TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException {
requireNonNull(task);
- requireNonNull(slaveId);
+ requireNonNull(offer);
+
+ SlaveID slaveId = offer.getSlaveId();
byte[] taskInBytes;
try {
@@ -108,13 +113,21 @@ public interface MesosTaskFactory {
}
ITaskConfig config = task.getTask();
-
+ AcceptedOffer acceptedOffer;
// TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
- List<Resource> resources = ResourceSlot.from(config).toResourceList(
- task.isSetAssignedPorts()
- ? ImmutableSet.copyOf(task.getAssignedPorts().values())
- : ImmutableSet.of(),
- tierManager.getTier(task.getTask()));
+ try {
+ acceptedOffer = AcceptedOffer.create(
+ offer,
+ ResourceSlot.from(config),
+ executorSettings.getExecutorOverhead(),
+ task.isSetAssignedPorts()
+ ? ImmutableSet.copyOf(task.getAssignedPorts().values())
+ : ImmutableSet.of(),
+ tierManager.getTier(task.getTask()));
+ } catch (Resources.InsufficientResourcesException e) {
+ throw new SchedulerException(e);
+ }
+ List<Resource> resources = acceptedOffer.getTaskResources();
LOG.debug(
"Setting task resources to {}",
@@ -128,9 +141,9 @@ public interface MesosTaskFactory {
.setData(ByteString.copyFrom(taskInBytes));
if (config.getContainer().isSetMesos()) {
- configureTaskForNoContainer(task, config, taskBuilder);
+ configureTaskForNoContainer(task, config, taskBuilder, acceptedOffer);
} else if (config.getContainer().isSetDocker()) {
- configureTaskForDockerContainer(task, config, taskBuilder);
+ configureTaskForDockerContainer(task, config, taskBuilder, acceptedOffer);
} else {
throw new SchedulerException("Task had no supported container set.");
}
@@ -141,15 +154,17 @@ public interface MesosTaskFactory {
private void configureTaskForNoContainer(
IAssignedTask task,
ITaskConfig config,
- TaskInfo.Builder taskBuilder) {
+ TaskInfo.Builder taskBuilder,
+ AcceptedOffer acceptedOffer) {
- taskBuilder.setExecutor(configureTaskForExecutor(task, config).build());
+ taskBuilder.setExecutor(configureTaskForExecutor(task, config, acceptedOffer).build());
}
private void configureTaskForDockerContainer(
IAssignedTask task,
ITaskConfig taskConfig,
- TaskInfo.Builder taskBuilder) {
+ TaskInfo.Builder taskBuilder,
+ AcceptedOffer acceptedOffer) {
IDockerContainer config = taskConfig.getContainer().getDocker();
Iterable<Protos.Parameter> parameters = Iterables.transform(config.getParameters(),
@@ -164,7 +179,7 @@ public interface MesosTaskFactory {
configureContainerVolumes(containerBuilder);
- ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, taskConfig)
+ ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, taskConfig, acceptedOffer)
.setContainer(containerBuilder.build());
taskBuilder.setExecutor(execBuilder.build());
@@ -172,11 +187,18 @@ public interface MesosTaskFactory {
private ExecutorInfo.Builder configureTaskForExecutor(
IAssignedTask task,
- ITaskConfig config) {
+ ITaskConfig config,
+ AcceptedOffer acceptedOffer) {
- return executorSettings.getExecutorConfig().getExecutor().toBuilder()
+ ExecutorInfo.Builder builder = executorSettings.getExecutorConfig().getExecutor().toBuilder()
.setExecutorId(getExecutorId(task.getTaskId()))
.setSource(getInstanceSourceName(config, task.getInstanceId()));
+ List<Resource> executorResources = acceptedOffer.getExecutorResources();
+ LOG.debug(
+ "Setting executor resources to {}",
+ Iterables.transform(executorResources, Protobufs::toString));
+ builder.clearResources().addAllResources(executorResources);
+ return builder;
}
private void configureContainerVolumes(ContainerInfo.Builder containerBuilder) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 7e8e456..0c467a6 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -126,7 +126,7 @@ public interface TaskAssigner {
LOG.info(
"Offer on slave {} (id {}) is being assigned task for {}.",
host, offer.getSlaveId().getValue(), taskId);
- return taskFactory.createFrom(assigned, offer.getSlaveId());
+ return taskFactory.createFrom(assigned, offer);
}
@Timed("assigner_maybe_assign")
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java b/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java
new file mode 100644
index 0000000..39096af
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Data;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Resource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AcceptedOfferTest {
+
+ private static final Optional<String> TEST_ROLE = Optional.of("test-role");
+ private static final Optional<String> ABSENT_ROLE = Optional.absent();
+ private static final ResourceSlot TASK_SLOT = new ResourceSlot(
+ 4, Amount.of(100L, Data.MB), Amount.of(200L, Data.MB), 0);
+ private static final ResourceSlot EXECUTOR_SLOT = new ResourceSlot(
+ 0.25, Amount.of(25L, Data.MB), Amount.of(75L, Data.MB), 0);
+ private static final ResourceSlot TOTAL_SLOT = EXECUTOR_SLOT.add(TASK_SLOT);
+ private static final Integer[] TASK_PORTS = {80, 90};
+ private static final Set<Integer> TASK_PORTS_SET = ImmutableSet.copyOf(TASK_PORTS);
+
+ @Test
+ public void testReservedPredicates() {
+ Protos.Resource withRole = makeScalar(ResourceType.CPUS.getName(), TEST_ROLE, false, 1.0);
+ assertTrue(AcceptedOffer.RESERVED.apply(withRole));
+ assertFalse(AcceptedOffer.NOT_RESERVED.apply(withRole));
+ Protos.Resource absentRole = makeScalar(ResourceType.CPUS.getName(), ABSENT_ROLE, false, 1.0);
+ assertFalse(AcceptedOffer.RESERVED.apply(absentRole));
+ assertTrue(AcceptedOffer.NOT_RESERVED.apply(absentRole));
+ }
+
+ @Test
+ public void testAllocateEmpty() {
+ AcceptedOffer acceptedOffer = AcceptedOffer.create(
+ fakeOffer(Collections.emptyList()),
+ ResourceSlot.NONE,
+ ResourceSlot.NONE,
+ ImmutableSet.of(),
+ TierInfo.DEFAULT);
+ assertEquals(Collections.emptyList(), acceptedOffer.getTaskResources());
+ assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources());
+ }
+
+ @Test
+ public void testAllocateRange() {
+ List<Resource> resources = ImmutableList.<Resource>builder()
+ .add(makePortResource(Optional.absent(), 80, 81, 90, 91, 92, 93))
+ .add(makePortResource(TEST_ROLE, 100, 101))
+ .build();
+ AcceptedOffer acceptedOffer = AcceptedOffer.create(
+ fakeOffer(resources),
+ ResourceSlot.NONE,
+ ResourceSlot.NONE,
+ ImmutableSet.of(80, 90, 100),
+ TierInfo.DEFAULT);
+
+ List<Resource> expected = ImmutableList.<Resource>builder()
+ // Because we prefer reserved resources and handle them before non-reserved resources,
+ // result should have ports for the reserved resources first.
+ .add(makePortResource(TEST_ROLE, 100))
+ .add(makePortResource(Optional.absent(), 80, 90))
+ .build();
+ assertEquals(expected, acceptedOffer.getTaskResources());
+ assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources());
+ }
+
+ @Test(expected = Resources.InsufficientResourcesException.class)
+ public void testAllocateRangeInsufficent() {
+ List<Resource> resources = ImmutableList.of(
+ makePortResource(ABSENT_ROLE, 80),
+ makePortResource(ABSENT_ROLE, 100, 101));
+ AcceptedOffer.create(
+ fakeOffer(resources),
+ ResourceSlot.NONE,
+ ResourceSlot.NONE,
+ ImmutableSet.of(80, 90, 100),
+ TierInfo.DEFAULT);
+ }
+
+ @Test
+ public void testAllocateSingleRole() {
+ runAllocateSingleRole(ABSENT_ROLE, false);
+ runAllocateSingleRole(ABSENT_ROLE, true);
+ runAllocateSingleRole(TEST_ROLE, false);
+ runAllocateSingleRole(TEST_ROLE, true);
+ }
+
+ private void runAllocateSingleRole(Optional<String> role, boolean cpuRevocable) {
+ List<Resource> resources = ImmutableList.<Resource>builder()
+ .add(makeScalar(
+ ResourceType.CPUS.getName(), role, cpuRevocable, TOTAL_SLOT.getNumCpus()))
+ .add(makeScalar(
+ ResourceType.RAM_MB.getName(), role, false, TOTAL_SLOT.getRam().as(Data.MB)))
+ .add(makeScalar(
+ ResourceType.DISK_MB.getName(), role, false, TOTAL_SLOT.getDisk().as(Data.MB)))
+ .add(makePortResource(role, TASK_PORTS))
+ .build();
+ Protos.Offer offer = fakeOffer(resources);
+
+ AcceptedOffer offerAllocation = AcceptedOffer.create(
+ offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(cpuRevocable));
+
+ List<Resource> taskList = ImmutableList.<Resource>builder()
+ .add(makeScalar(ResourceType.CPUS.getName(), role, cpuRevocable, TASK_SLOT.getNumCpus()))
+ .add(makeScalar(ResourceType.RAM_MB.getName(), role, false, TASK_SLOT.getRam().as(Data.MB)))
+ .add(makeScalar(
+ ResourceType.DISK_MB.getName(), role, false, TASK_SLOT.getDisk().as(Data.MB)))
+ .add(makePortResource(role, TASK_PORTS))
+ .build();
+ assertEquals(taskList, offerAllocation.getTaskResources());
+
+ List<Resource> executorList = ImmutableList.<Resource>builder()
+ .add(makeScalar(
+ ResourceType.CPUS.getName(), role, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+ .add(makeScalar(
+ ResourceType.RAM_MB.getName(), role, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
+ .add(makeScalar(
+ ResourceType.DISK_MB.getName(), role, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+ .build();
+ assertEquals(executorList, offerAllocation.getExecutorResources());
+ }
+
+ @Test(expected = Resources.InsufficientResourcesException.class)
+ public void testAllocateSingleRoleInsufficient() {
+ List<Resource> resources = ImmutableList.<Resource>builder()
+ // EXECUTOR_SLOT's CPU is not included here.
+ .add(makeScalar(ResourceType.CPUS.getName(), TEST_ROLE, false, TASK_SLOT.getNumCpus()))
+ .add(makeScalar(
+ ResourceType.RAM_MB.getName(), TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)))
+ .add(makeScalar(
+ ResourceType.DISK_MB.getName(), TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)))
+ .add(makePortResource(TEST_ROLE, TASK_PORTS))
+ .build();
+ Protos.Offer offer = fakeOffer(resources);
+
+ AcceptedOffer.create(
+ offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false));
+ }
+
+ @Test
+ public void testMultipleRoles() {
+ runMultipleRoles(false);
+ runMultipleRoles(true);
+ }
+
+ private void runMultipleRoles(boolean cpuRevocable) {
+ List<Resource> resources = ImmutableList.<Resource>builder()
+ // Make cpus come from two roles.
+ .add(makeScalar(
+ ResourceType.CPUS.getName(),
+ TEST_ROLE,
+ cpuRevocable,
+ EXECUTOR_SLOT.getNumCpus()))
+ .add(makeScalar(
+ ResourceType.CPUS.getName(),
+ ABSENT_ROLE,
+ cpuRevocable,
+ TASK_SLOT.getNumCpus()))
+ // Make ram come from default role
+ .add(makeScalar(
+ ResourceType.RAM_MB.getName(),
+ ABSENT_ROLE,
+ false,
+ TOTAL_SLOT.getRam().as(Data.MB)))
+ // Make disk come from non-default role.
+ .add(makeScalar(
+ ResourceType.DISK_MB.getName(),
+ TEST_ROLE,
+ false,
+ TOTAL_SLOT.getDisk().as(Data.MB)))
+ .add(makePortResource(TEST_ROLE, 80))
+ .add(makePortResource(ABSENT_ROLE, 90))
+ .build();
+
+ Protos.Offer offer = fakeOffer(resources);
+
+ AcceptedOffer offerAllocation = AcceptedOffer.create(
+ offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(cpuRevocable));
+
+ List<Resource> taskList = ImmutableList.<Resource>builder()
+ // We intentionally sliced the offer resource to not align with TASK_SLOT's num cpus.
+ .add(makeScalar(
+ ResourceType.CPUS.getName(), TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+ .add(makeScalar(
+ ResourceType.CPUS.getName(),
+ ABSENT_ROLE,
+ cpuRevocable,
+ TASK_SLOT.subtract(EXECUTOR_SLOT).getNumCpus()))
+ .add(makeScalar(
+ ResourceType.RAM_MB.getName(), ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB)))
+ .add(makeScalar(
+ ResourceType.DISK_MB.getName(), TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB)))
+ .add(makePortResource(TEST_ROLE, 80))
+ .add(makePortResource(ABSENT_ROLE, 90))
+ .build();
+ assertEquals(taskList, offerAllocation.getTaskResources());
+
+ List<Resource> executorList = ImmutableList.<Resource>builder()
+ .add(makeScalar(
+ ResourceType.CPUS.getName(), ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+ .add(makeScalar(
+ ResourceType.RAM_MB.getName(), ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
+ .add(makeScalar(
+ ResourceType.DISK_MB.getName(), TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+ .build();
+ assertEquals(executorList, offerAllocation.getExecutorResources());
+ }
+
+ @Test(expected = Resources.InsufficientResourcesException.class)
+ public void testMultipleRolesInsufficient() {
+ // Similar to testMultipleRoles, but make some of cpus as revocable
+ List<Resource> resources = ImmutableList.<Resource>builder()
+ // Make cpus come from two roles.
+ .add(makeScalar(
+ ResourceType.CPUS.getName(),
+ TEST_ROLE,
+ true,
+ EXECUTOR_SLOT.getNumCpus()))
+ .add(makeScalar(
+ ResourceType.CPUS.getName(),
+ ABSENT_ROLE,
+ false,
+ TASK_SLOT.getNumCpus()))
+ // Make ram come from default role
+ .add(makeScalar(
+ ResourceType.RAM_MB.getName(),
+ ABSENT_ROLE,
+ false,
+ TOTAL_SLOT.getRam().as(Data.MB)))
+ // Make disk come from non-default role.
+ .add(makeScalar(
+ ResourceType.DISK_MB.getName(),
+ TEST_ROLE,
+ false,
+ TOTAL_SLOT.getDisk().as(Data.MB)))
+ .add(makePortResource(TEST_ROLE, 80))
+ .add(makePortResource(ABSENT_ROLE, 90))
+ .build();
+ Protos.Offer offer = fakeOffer(resources);
+ // We don't have enough resource to satisfy a non-revocable request.
+ AcceptedOffer.create(
+ offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false));
+ }
+
+ private static Resource makePortResource(Optional<String> role, Integer... values) {
+ Resource.Builder prototype = Resource.newBuilder()
+ .setType(Protos.Value.Type.RANGES)
+ .setName(ResourceType.PORTS.getName());
+ if (role.isPresent()) {
+ prototype.setRole(role.get());
+ }
+ return AcceptedOffer.makeMesosRangeResource(prototype.build(), ImmutableSet.copyOf(values));
+ }
+
+ private static Resource makeScalar(
+ String name, Optional<String> role, boolean revocable, double value) {
+ Resource.Builder resource = Resource.newBuilder()
+ .setName(name)
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(value));
+ if (role.isPresent()) {
+ resource.setRole(role.get());
+ }
+ if (revocable) {
+ resource.setRevocable(Resource.RevocableInfo.getDefaultInstance());
+ }
+ return resource.build();
+ }
+
+ private static Protos.Offer fakeOffer(List<Resource> resources) {
+ return Protos.Offer.newBuilder()
+ .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
+ .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
+ .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
+ .setHostname("hostname")
+ .addAllResources(resources)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
index e4ae943..52113b8 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
@@ -88,29 +88,25 @@ public class ResourceSlotTest {
}
@Test
- public void testToResourceListNoRevocable() {
+ public void testToResourceListNoRevoca() {
ResourceSlot resources = ResourceSlot.from(TASK);
- Set<Integer> ports = ImmutableSet.of(80, 443);
assertEquals(
ImmutableSet.of(
makeMesosResource(CPUS, TASK.getNumCpus(), false),
makeMesosResource(RAM_MB, TASK.getRamMb(), false),
- makeMesosResource(DISK_MB, TASK.getDiskMb(), false),
- makeMesosRangeResource(PORTS, ports)),
- ImmutableSet.copyOf(resources.toResourceList(ports, DEFAULT)));
+ makeMesosResource(DISK_MB, TASK.getDiskMb(), false)),
+ ImmutableSet.copyOf(resources.toResourceList(DEFAULT)));
}
@Test
public void testToResourceListRevocable() {
ResourceSlot resources = ResourceSlot.from(TASK);
- Set<Integer> ports = ImmutableSet.of(80, 443);
assertEquals(
ImmutableSet.of(
makeMesosResource(CPUS, TASK.getNumCpus(), true),
makeMesosResource(RAM_MB, TASK.getRamMb(), false),
- makeMesosResource(DISK_MB, TASK.getDiskMb(), false),
- makeMesosRangeResource(PORTS, ports)),
- ImmutableSet.copyOf(resources.toResourceList(ports, REVOCABLE_TIER)));
+ makeMesosResource(DISK_MB, TASK.getDiskMb(), false)),
+ ImmutableSet.copyOf(resources.toResourceList(REVOCABLE_TIER)));
}
@Test
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
index 33149ab..dc964b8 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
@@ -28,9 +28,13 @@ import org.junit.Test;
import static org.apache.mesos.Protos.FrameworkInfo.Capability.Type.REVOCABLE_RESOURCES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class CommandLineDriverSettingsModuleTest {
+ private static final String TEST_ROLE = "test-role";
+
@Test(expected = IllegalStateException.class)
public void testMissingPropertiesParsing() {
Properties testProperties = new Properties();
@@ -72,9 +76,11 @@ public class CommandLineDriverSettingsModuleTest {
"user",
Optional.absent(),
Amount.of(1L, Time.MINUTES),
- false);
+ false,
+ Optional.absent());
assertEquals("", info.getPrincipal());
assertEquals(0, info.getCapabilitiesCount());
+ assertFalse(info.hasRole());
}
@Test
@@ -83,10 +89,12 @@ public class CommandLineDriverSettingsModuleTest {
"user",
Optional.absent(),
Amount.of(1L, Time.MINUTES),
- true);
+ true,
+ Optional.absent());
assertEquals("", info.getPrincipal());
assertEquals(1, info.getCapabilitiesCount());
assertEquals(REVOCABLE_RESOURCES, info.getCapabilities(0).getType());
+ assertFalse(info.hasRole());
}
@Test
@@ -95,20 +103,25 @@ public class CommandLineDriverSettingsModuleTest {
"user",
Optional.of("auroraprincipal"),
Amount.of(1L, Time.MINUTES),
- false);
+ false,
+ Optional.absent());
assertEquals("auroraprincipal", info.getPrincipal());
assertEquals(0, info.getCapabilitiesCount());
+ assertFalse(info.hasRole());
}
@Test
- public void testFrameworkInfoRevocableWithAnnouncedPrincipal() {
+ public void testFrameworkInfoRevocableWithAnnouncedPrincipalAndRole() {
Protos.FrameworkInfo info = CommandLineDriverSettingsModule.buildFrameworkInfo(
"user",
Optional.of("auroraprincipal"),
Amount.of(1L, Time.MINUTES),
- true);
+ true,
+ Optional.of(TEST_ROLE));
assertEquals("auroraprincipal", info.getPrincipal());
assertEquals(1, info.getCapabilitiesCount());
assertEquals(REVOCABLE_RESOURCES, info.getCapabilities(0).getType());
+ assertTrue(info.hasRole());
+ assertEquals(TEST_ROLE, info.getRole());
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index a5793bf..066c6a3 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -13,6 +13,8 @@
*/
package org.apache.aurora.scheduler.mesos;
+import java.util.stream.Collectors;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -27,6 +29,7 @@ import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.MesosContainer;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.ResourceType;
import org.apache.aurora.scheduler.Resources;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
@@ -37,6 +40,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.ContainerInfo.DockerInfo;
import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.Parameter;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.SlaveID;
@@ -46,9 +50,11 @@ import org.apache.mesos.Protos.Volume.Mode;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.scheduler.ResourceSlot.makeMesosRangeResource;
import static org.apache.aurora.scheduler.TierInfo.DEFAULT;
import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
+import static org.apache.aurora.scheduler.mesos.TaskExecutors.SOME_OVERHEAD_EXECUTOR;
import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_CONFIG;
import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR;
import static org.easymock.EasyMock.expect;
@@ -85,6 +91,23 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
ImmutableList.of(new DockerParameter("label", "testparameter")))))));
private static final SlaveID SLAVE = SlaveID.newBuilder().setValue("slave-id").build();
+ private static final Offer OFFER_THERMOS_EXECUTOR = Protos.Offer.newBuilder()
+ .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
+ .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
+ .setSlaveId(SLAVE)
+ .setHostname("slave-hostname")
+ .addAllResources(
+ ResourceSlot.from(TASK_CONFIG).add(THERMOS_EXECUTOR.getExecutorOverhead())
+ .toResourceList(DEFAULT))
+ .addResources(makeMesosRangeResource(ResourceType.PORTS, ImmutableSet.of(80)))
+ .build();
+ private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder()
+ .clearResources()
+ .addAllResources(
+ ResourceSlot.from(TASK_CONFIG).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead())
+ .toResourceList(DEFAULT))
+ .addResources(makeMesosRangeResource(ResourceType.PORTS, ImmutableSet.of(80)))
+ .build();
private MesosTaskFactory taskFactory;
private ExecutorSettings config;
@@ -106,6 +129,18 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
.build();
}
+ private static ExecutorInfo purgeZeroResources(ExecutorInfo executor) {
+ return executor.toBuilder()
+ .clearResources()
+ .addAllResources(
+ executor.getResourcesList()
+ .stream()
+ .filter(
+ e -> !e.hasScalar() || e.getScalar().getValue() > 0)
+ .collect(Collectors.toList()))
+ .build();
+ }
+
@Test
public void testExecutorInfoUnchanged() {
expect(tierManager.getTier(TASK_CONFIG)).andReturn(DEFAULT);
@@ -113,7 +148,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
control.replay();
- TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
+ TaskInfo task = taskFactory.createFrom(TASK, OFFER_THERMOS_EXECUTOR);
assertEquals(populateDynamicFields(DEFAULT_EXECUTOR, TASK), task.getExecutor());
checkTaskResources(TASK.getTask(), task);
@@ -124,9 +159,17 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
expect(tierManager.getTier(TASK_CONFIG)).andReturn(REVOCABLE_TIER);
taskFactory = new MesosTaskFactoryImpl(config, tierManager);
+ Resource revocableCPU = OFFER_THERMOS_EXECUTOR.getResources(0).toBuilder()
+ .setRevocable(Resource.RevocableInfo.getDefaultInstance())
+ .build();
+ Offer withRevocable = OFFER_THERMOS_EXECUTOR.toBuilder()
+ .removeResources(0)
+ .addResources(0, revocableCPU)
+ .build();
+
control.replay();
- TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
+ TaskInfo task = taskFactory.createFrom(TASK, withRevocable);
checkTaskResources(TASK.getTask(), task);
assertTrue(task.getResourcesList().stream().anyMatch(Resource::hasRevocable));
}
@@ -142,7 +185,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
control.replay();
- TaskInfo task = taskFactory.createFrom(IAssignedTask.build(builder), SLAVE);
+ TaskInfo task = taskFactory.createFrom(IAssignedTask.build(builder), OFFER_THERMOS_EXECUTOR);
checkTaskResources(ITaskConfig.build(builder.getTask()), task);
}
@@ -156,9 +199,10 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
control.replay();
- TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
+ TaskInfo task = taskFactory.createFrom(TASK, OFFER_THERMOS_EXECUTOR);
assertEquals(
- populateDynamicFields(NO_OVERHEAD_EXECUTOR.getExecutorConfig().getExecutor(), TASK),
+ purgeZeroResources(populateDynamicFields(
+ NO_OVERHEAD_EXECUTOR.getExecutorConfig().getExecutor(), TASK)),
task.getExecutor());
// Simulate the upsizing needed for the task to meet the minimum thermos requirements.
@@ -177,13 +221,14 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
}
private TaskInfo getDockerTaskInfo(IAssignedTask task) {
- config = TaskExecutors.SOME_OVERHEAD_EXECUTOR;
+ config = SOME_OVERHEAD_EXECUTOR;
+
expect(tierManager.getTier(task.getTask())).andReturn(DEFAULT);
taskFactory = new MesosTaskFactoryImpl(config, tierManager);
control.replay();
- return taskFactory.createFrom(task, SLAVE);
+ return taskFactory.createFrom(task, OFFER_SOME_OVERHEAD_EXECUTOR);
}
@Test
@@ -217,7 +262,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
control.replay();
- TaskInfo taskInfo = taskFactory.createFrom(TASK_WITH_DOCKER, SLAVE);
+ TaskInfo taskInfo = taskFactory.createFrom(TASK_WITH_DOCKER, OFFER_THERMOS_EXECUTOR);
assertEquals(
config.getExecutorConfig().getVolumeMounts(),
taskInfo.getExecutor().getContainer().getVolumesList());
http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 3cbe9ac..b00add0 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -135,7 +135,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
MESOS_OFFER.getSlaveId(),
ImmutableMap.of(PORT_NAME, PORT)))
.andReturn(TASK.getAssignedTask());
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId()))
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
.andReturn(TASK_INFO);
control.replay();
@@ -204,7 +204,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
LOST,
LAUNCH_FAILED_MSG))
.andReturn(StateChangeResult.SUCCESS);
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId()))
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
.andReturn(TASK_INFO);
control.replay();
@@ -261,7 +261,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
offer.getOffer().getSlaveId(),
ImmutableMap.of(PORT_NAME, PORT)))
.andReturn(TASK.getAssignedTask());
- expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer().getSlaveId()))
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer()))
.andReturn(TASK_INFO);
offerManager.launchTask(offer.getOffer().getId(), TASK_INFO);
@@ -313,7 +313,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
OFFER.getOffer().getSlaveId(),
ImmutableMap.of(PORT_NAME, PORT)))
.andReturn(TASK.getAssignedTask());
- expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer().getSlaveId()))
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer()))
.andReturn(TASK_INFO);
offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);