You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/05/02 18:05:01 UTC
aurora git commit: Generalizing port resource management.
Repository: aurora
Updated Branches:
refs/heads/master 450d88156 -> d8d1c8d68
Generalizing port resource management.
Reviewed at https://reviews.apache.org/r/46810/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d8d1c8d6
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d8d1c8d6
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d8d1c8d6
Branch: refs/heads/master
Commit: d8d1c8d6889b39c5da0ca95496ef8bda7827fb4a
Parents: 450d881
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Mon May 2 09:04:54 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Mon May 2 09:04:54 2016 -0700
----------------------------------------------------------------------
.../configuration/ConfigurationManager.java | 19 +-
.../scheduler/filter/SchedulingFilter.java | 4 -
.../scheduler/resources/ResourceManager.java | 81 +++++++
.../scheduler/resources/ResourceMapper.java | 84 ++++++++
.../scheduler/resources/ResourceSlot.java | 3 +-
.../scheduler/resources/ResourceType.java | 43 +++-
.../aurora/scheduler/resources/Resources.java | 31 ---
.../aurora/scheduler/state/StateManager.java | 7 +-
.../scheduler/state/StateManagerImpl.java | 13 +-
.../aurora/scheduler/state/TaskAssigner.java | 28 +--
.../configuration/ConfigurationManagerTest.java | 11 +
.../filter/SchedulingFilterImplTest.java | 25 +--
.../mesos/MesosTaskFactoryImplTest.java | 7 +-
.../scheduler/resources/AcceptedOfferTest.java | 213 ++++++-------------
.../scheduler/resources/PortMapperTest.java | 62 ++++++
.../resources/ResourceManagerTest.java | 88 ++++++++
.../scheduler/resources/ResourceTestUtil.java | 85 ++++++++
.../scheduler/resources/ResourcesTest.java | 144 ++-----------
.../scheduler/state/StateManagerImplTest.java | 23 +-
.../scheduler/state/TaskAssignerImplTest.java | 89 ++++----
20 files changed, 645 insertions(+), 415 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 9a15a4b..e1ce638 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -38,10 +38,12 @@ import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.UserProvidedStrings;
+import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IContainer;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IResource;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
@@ -49,6 +51,8 @@ import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+
/**
* Manages translation from a string-mapped configuration to a concrete configuration type, and
* defaults for optional values.
@@ -224,8 +228,6 @@ public class ConfigurationManager {
builder.setRequestedPorts(ImmutableSet.of());
}
- maybeFillLinks(builder);
-
if (config.isSetTier() && !UserProvidedStrings.isGoodIdentifier(config.getTier())) {
throw new TaskDescriptionException("Tier contains illegal characters: " + config.getTier());
}
@@ -327,6 +329,8 @@ public class ConfigurationManager {
throw new TaskDescriptionException("Multiple resource values are not supported for " + types);
}
+ maybeFillLinks(builder);
+
return ITaskConfig.build(builder);
}
@@ -343,11 +347,12 @@ public class ConfigurationManager {
private static void maybeFillLinks(TaskConfig task) {
if (task.getTaskLinksSize() == 0) {
ImmutableMap.Builder<String, String> links = ImmutableMap.builder();
- if (task.getRequestedPorts().contains("health")) {
- links.put("health", "http://%host%:%port:health%");
- }
- if (task.getRequestedPorts().contains("http")) {
- links.put("http", "http://%host%:%port:http%");
+ for (IResource resource : ResourceManager.getTaskResources(ITaskConfig.build(task), PORTS)) {
+ if (resource.getNamedPort().equals("health")) {
+ links.put("health", "http://%host%:%port:health%");
+ } else if (resource.getNamedPort().equals("http")) {
+ links.put("http", "http://%host%:%port:http%");
+ }
}
task.setTaskLinks(links.build());
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 625e6d5..1ee2cfa 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -303,10 +303,6 @@ public interface SchedulingFilter {
return jobState;
}
- public Set<String> getRequestedPorts() {
- return task.getRequestedPorts();
- }
-
@Override
public boolean equals(Object o) {
if (!(o instanceof ResourceRequest)) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
new file mode 100644
index 0000000..8b42bf0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos.Resource;
+
+import static org.apache.mesos.Protos.Offer;
+
+/**
+ * Manages resources and provides Aurora/Mesos translation.
+ */
+public final class ResourceManager {
+ private ResourceManager() {
+ // Utility class.
+ }
+
+ /**
+ * Gets offer resources matching specified {@link ResourceType}.
+ *
+ * @param offer Offer to get resources from.
+ * @param type {@link ResourceType} to filter resources by.
+ * @return Offer resources matching {@link ResourceType}.
+ */
+ public static Iterable<Resource> getOfferResources(Offer offer, ResourceType type) {
+ return Iterables.filter(offer.getResourcesList(), r -> r.getName().equals(type.getMesosName()));
+ }
+
+ /**
+ * Same as {@link #getTaskResources(ITaskConfig, ResourceType)}.
+ *
+ * @param task Scheduled task to get resources from.
+ * @param type {@link ResourceType} to filter resources by.
+ * @return Task resources matching {@link ResourceType}.
+ */
+ public static Iterable<IResource> getTaskResources(IScheduledTask task, ResourceType type) {
+ return getTaskResources(task.getAssignedTask().getTask(), type);
+ }
+
+ /**
+ * Gets task resources matching specified {@link ResourceType}.
+ *
+ * @param task Task config to get resources from.
+ * @param type {@link ResourceType} to filter resources by.
+ * @return Task resources matching {@link ResourceType}.
+ */
+ public static Iterable<IResource> getTaskResources(ITaskConfig task, ResourceType type) {
+ return Iterables.filter(task.getResources(), r -> ResourceType.fromResource(r).equals(type));
+ }
+
+ /**
+ * Gets unique task resource types.
+ *
+ * @param task Task to get resource types from.
+ * @return Set of {@link ResourceType} instances representing task resources.
+ */
+ public static Set<ResourceType> getTaskResourceTypes(IScheduledTask task) {
+ return EnumSet.copyOf(task.getAssignedTask().getTask().getResources().stream()
+ .map(r -> ResourceType.fromResource(r))
+ .collect(Collectors.toSet()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
new file mode 100644
index 0000000..c06ce8d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos.Offer;
+
+import static java.util.stream.StreamSupport.stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.DiscreteDomain.integers;
+
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+
+/**
+ * Maps requested (task) resources to available (offer) resources.
+ */
+public interface ResourceMapper {
+
+ /**
+ * Maps task resources to offer resources and returns a new task with updated mapping.
+ *
+ * @param offer Offer with available resources.
+ * @param task Task with requested resources.
+ * @return A new task with updated mapping.
+ */
+ IScheduledTask mapAndAssign(Offer offer, IScheduledTask task);
+
+ PortMapper PORT_MAPPER = new PortMapper();
+
+ class PortMapper implements ResourceMapper {
+ @Override
+ public IScheduledTask mapAndAssign(Offer offer, IScheduledTask task) {
+ List<Integer> availablePorts =
+ stream(ResourceManager.getOfferResources(offer, PORTS).spliterator(), false)
+ .flatMap(resource -> resource.getRanges().getRangeList().stream())
+ .flatMap(range -> ContiguousSet.create(
+ Range.closed((int) range.getBegin(), (int) range.getEnd()),
+ integers()).stream())
+ .collect(Collectors.toList());
+
+ Collections.shuffle(availablePorts);
+
+ List<String> requestedPorts =
+ stream(ResourceManager.getTaskResources(task, PORTS).spliterator(), false)
+ .map(e -> e.getNamedPort())
+ .collect(Collectors.toList());
+
+ checkState(
+ availablePorts.size() >= requestedPorts.size(),
+ String.format("Insufficient ports %d when matching %s", availablePorts.size(), task));
+
+ Iterator<Integer> ports = availablePorts.iterator();
+ Map<String, Integer> portMap =
+ requestedPorts.stream().collect(Collectors.toMap(key -> key, value -> ports.next()));
+
+ ScheduledTask builder = task.newBuilder();
+ builder.getAssignedTask().setAssignedPorts(ImmutableMap.copyOf(portMap));
+ return IScheduledTask.build(builder);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
index 1df2c11..a8dee95 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
@@ -43,6 +43,7 @@ import static java.util.Objects.requireNonNull;
import static org.apache.aurora.common.quantity.Data.BYTES;
import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
/**
@@ -94,7 +95,7 @@ public final class ResourceSlot {
task.getNumCpus(),
Amount.of(task.getRamMb(), Data.MB),
Amount.of(task.getDiskMb(), Data.MB),
- task.getRequestedPorts().size());
+ Iterables.size(ResourceManager.getTaskResources(task, PORTS)));
}
/**
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
index 6e4d694..ee2b51a 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
@@ -14,6 +14,7 @@
package org.apache.aurora.scheduler.resources;
import java.util.EnumSet;
+import java.util.Optional;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
@@ -29,6 +30,7 @@ import static java.util.Objects.requireNonNull;
import static org.apache.aurora.common.quantity.Data.GB;
import static org.apache.aurora.common.quantity.Data.MB;
+import static org.apache.aurora.scheduler.resources.ResourceMapper.PORT_MAPPER;
import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.DOUBLE;
import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.LONG;
import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.STRING;
@@ -43,22 +45,38 @@ public enum ResourceType implements TEnum {
/**
* CPU resource.
*/
- CPUS(_Fields.NUM_CPUS, SCALAR, "cpus", DOUBLE, "CPU", 16, false),
+ CPUS(_Fields.NUM_CPUS, SCALAR, "cpus", DOUBLE, Optional.empty(), "CPU", 16, false),
/**
* RAM resource.
*/
- RAM_MB(_Fields.RAM_MB, SCALAR, "mem", LONG, "RAM", Amount.of(24, GB).as(MB), false),
+ RAM_MB(
+ _Fields.RAM_MB,
+ SCALAR,
+ "mem",
+ LONG,
+ Optional.empty(),
+ "RAM",
+ Amount.of(24, GB).as(MB),
+ false),
/**
* DISK resource.
*/
- DISK_MB(_Fields.DISK_MB, SCALAR, "disk", LONG, "disk", Amount.of(450, GB).as(MB), false),
+ DISK_MB(
+ _Fields.DISK_MB,
+ SCALAR,
+ "disk",
+ LONG,
+ Optional.empty(),
+ "disk",
+ Amount.of(450, GB).as(MB),
+ false),
/**
* Port resource.
*/
- PORTS(_Fields.NAMED_PORT, RANGES, "ports", STRING, "ports", 1000, true);
+ PORTS(_Fields.NAMED_PORT, RANGES, "ports", STRING, Optional.of(PORT_MAPPER), "ports", 1000, true);
/**
* Correspondent thrift {@link org.apache.aurora.gen.Resource} enum value.
@@ -81,6 +99,11 @@ public enum ResourceType implements TEnum {
private final ResourceTypeConverter<?> typeConverter;
/**
+ * Optional resource mapper to use.
+ */
+ private final Optional<ResourceMapper> mapper;
+
+ /**
* Aurora resource name.
*/
private final String auroraName;
@@ -105,6 +128,7 @@ public enum ResourceType implements TEnum {
* @param mesosType See {@link #getMesosType()} for more details.
* @param mesosName See {@link #getMesosName()} for more details.
* @param typeConverter See {@link #getTypeConverter()} for more details.
+ * @param mapper See {@link #getMapper()} for more details.
* @param auroraName See {@link #getAuroraName()} for more details.
* @param scalingRange See {@link #getScalingRange()} for more details.
* @param isMultipleAllowed See {@link #isMultipleAllowed()} for more details.
@@ -114,6 +138,7 @@ public enum ResourceType implements TEnum {
Protos.Value.Type mesosType,
String mesosName,
ResourceTypeConverter<?> typeConverter,
+ Optional<ResourceMapper> mapper,
String auroraName,
int scalingRange,
boolean isMultipleAllowed) {
@@ -123,6 +148,7 @@ public enum ResourceType implements TEnum {
this.mesosName = requireNonNull(mesosName);
this.typeConverter = requireNonNull(typeConverter);
this.auroraName = requireNonNull(auroraName);
+ this.mapper = requireNonNull(mapper);
this.scalingRange = scalingRange;
this.isMultipleAllowed = isMultipleAllowed;
}
@@ -171,6 +197,15 @@ public enum ResourceType implements TEnum {
}
/**
+ * Gets optional resource mapper. See {@link ResourceMapper} for more details.
+ *
+ * @return Optional ResourceMapper.
+ */
+ public Optional<ResourceMapper> getMapper() {
+ return mapper;
+ }
+
+ /**
* Gets resource name for internal Aurora representation (e.g. in the UI).
*
* @return Aurora resource name.
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
index 894c6a6..36d1de8 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
@@ -13,8 +13,6 @@
*/
package org.apache.aurora.scheduler.resources;
-import java.util.Collections;
-import java.util.List;
import java.util.Set;
import com.google.common.base.Function;
@@ -23,10 +21,7 @@ import com.google.common.base.Predicates;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Data;
@@ -119,32 +114,6 @@ public final class Resources {
getNumAvailablePorts());
}
- /**
- * Attempts to grab {@code numPorts} from this resource instance.
- *
- * @param numPorts The number of ports to grab.
- * @return The set of ports grabbed.
- * @throws InsufficientResourcesException if not enough ports were available.
- */
- public Set<Integer> getPorts(int numPorts)
- throws InsufficientResourcesException {
-
- if (numPorts == 0) {
- return ImmutableSet.of();
- }
-
- List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(Iterables.concat(
- Iterables.transform(getPortRanges(), RANGE_TO_MEMBERS))));
-
- if (availablePorts.size() < numPorts) {
- throw new InsufficientResourcesException(
- String.format("Could not get %d ports from %s", numPorts, availablePorts));
- }
-
- Collections.shuffle(availablePorts);
- return ImmutableSet.copyOf(availablePorts.subList(0, numPorts));
- }
-
private int getNumAvailablePorts() {
int offeredPorts = 0;
for (Range range : getPortRanges()) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 5d34fe3..66bfd72 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -13,13 +13,14 @@
*/
package org.apache.aurora.scheduler.state;
-import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import com.google.common.base.Optional;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos.SlaveID;
@@ -62,7 +63,7 @@ public interface StateManager {
* @param taskId ID of the task to mutate.
* @param slaveHost Host name that the task is being assigned to.
* @param slaveId ID of the slave that the task is being assigned to.
- * @param assignedPorts Ports on the host that are being assigned to the task.
+ * @param resourceAssigner The resource assign operation.
* @return The updated task record, or {@code null} if the task was not found.
*/
IAssignedTask assignTask(
@@ -70,7 +71,7 @@ public interface StateManager {
String taskId,
String slaveHost,
SlaveID slaveId,
- Map<String, Integer> assignedPorts);
+ Function<IScheduledTask, IScheduledTask> resourceAssigner);
/**
* Inserts pending instances using {@code task} as their configuration. Tasks will immediately
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index e5b2f41..2b4fac1 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -18,11 +18,11 @@ import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@@ -165,18 +165,18 @@ public class StateManagerImpl implements StateManager {
String taskId,
String slaveHost,
SlaveID slaveId,
- Map<String, Integer> assignedPorts) {
+ Function<IScheduledTask, IScheduledTask> resourceAssigner) {
checkNotBlank(taskId);
checkNotBlank(slaveHost);
requireNonNull(slaveId);
- requireNonNull(assignedPorts);
+ requireNonNull(resourceAssigner);
IScheduledTask mutated = storeProvider.getUnsafeTaskStore().mutateTask(taskId,
task -> {
+ task = resourceAssigner.apply(task);
ScheduledTask builder = task.newBuilder();
builder.getAssignedTask()
- .setAssignedPorts(assignedPorts)
.setSlaveHost(slaveHost)
.setSlaveId(slaveId.getValue());
return IScheduledTask.build(builder);
@@ -231,9 +231,6 @@ public class StateManagerImpl implements StateManager {
transitionMessage);
}
- private static final Function<SideEffect, Action> GET_ACTION =
- SideEffect::getAction;
-
private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
Action.INCREMENT_FAILURES,
Action.SAVE_STATE,
@@ -252,7 +249,7 @@ public class StateManagerImpl implements StateManager {
// (thus losing the object to copy), or rescheduling a task before incrementing the failure count
// (thus not carrying forward the failure increment).
private static final Ordering<SideEffect> ACTION_ORDER =
- Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
+ Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(SideEffect::getAction);
private StateChangeResult updateTaskAndExternalState(
TaskStore.Mutable taskStore,
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 7d43d4a..1a3886f 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -13,7 +13,6 @@
*/
package org.apache.aurora.scheduler.state;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -22,8 +21,6 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.stats.Stats;
@@ -38,8 +35,11 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.resources.Resources;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos.TaskInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,26 +103,29 @@ public interface TaskAssigner {
this.tierManager = requireNonNull(tierManager);
}
+ @VisibleForTesting
+ IScheduledTask mapAndAssignResources(Offer offer, IScheduledTask task) {
+ IScheduledTask assigned = task;
+ for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) {
+ if (type.getMapper().isPresent()) {
+ assigned = type.getMapper().get().mapAndAssign(offer, assigned);
+ }
+ }
+ return assigned;
+ }
+
private TaskInfo assign(
MutableStoreProvider storeProvider,
Offer offer,
- Set<String> requestedPorts,
String taskId) {
String host = offer.getHostname();
- Set<Integer> selectedPorts = Resources.from(offer).getPorts(requestedPorts.size());
- Preconditions.checkState(selectedPorts.size() == requestedPorts.size());
-
- final Iterator<String> names = requestedPorts.iterator();
- Map<String, Integer> portsByName = FluentIterable.from(selectedPorts)
- .uniqueIndex(input -> names.next());
-
IAssignedTask assigned = stateManager.assignTask(
storeProvider,
taskId,
host,
offer.getSlaveId(),
- portsByName);
+ task -> mapAndAssignResources(offer, task));
LOG.info(
"Offer on slave {} (id {}) is being assigned task for {}.",
host, offer.getSlaveId().getValue(), taskId);
@@ -158,7 +161,6 @@ public interface TaskAssigner {
TaskInfo taskInfo = assign(
storeProvider,
offer.getOffer(),
- resourceRequest.getRequestedPorts(),
taskId);
try {
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
index 7313279..98fe860 100644
--- a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
@@ -298,6 +298,17 @@ public class ConfigurationManagerTest {
DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
}
+ @Test
+ public void testTaskLinks() throws Exception {
+ TaskConfig builder = CONFIG_WITH_CONTAINER.newBuilder();
+ builder.addToResources(namedPort("health"));
+ builder.unsetTaskLinks();
+
+ ITaskConfig populated =
+ DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+ assertEquals(ImmutableSet.of("health", "http"), populated.getTaskLinks().keySet());
+ }
+
private void expectTaskDescriptionException(String message) {
expectedException.expect(TaskDescriptionException.class);
expectedException.expectMessage(message);
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 6370a12..94a885f 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts;
import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
@@ -103,18 +104,18 @@ public class SchedulingFilterImplTest extends EasyMockTest {
ResourceSlot twoPorts = Resources.from(
Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 81))).slot();
- ITaskConfig noPortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
- .newBuilder()
- .setRequestedPorts(ImmutableSet.of()));
- ITaskConfig onePortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
- .newBuilder()
- .setRequestedPorts(ImmutableSet.of("one")));
- ITaskConfig twoPortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
- .newBuilder()
- .setRequestedPorts(ImmutableSet.of("one", "two")));
- ITaskConfig threePortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
- .newBuilder()
- .setRequestedPorts(ImmutableSet.of("one", "two", "three")));
+ ITaskConfig noPortTask = resetPorts(
+ makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK),
+ ImmutableSet.of());
+ ITaskConfig onePortTask = resetPorts(
+ makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK),
+ ImmutableSet.of("one"));
+ ITaskConfig twoPortTask = resetPorts(
+ makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK),
+ ImmutableSet.of("one", "two"));
+ ITaskConfig threePortTask = resetPorts(
+ makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK),
+ ImmutableSet.of("one", "two", "three"));
Set<Veto> none = ImmutableSet.of();
IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index bf18d5d..ad397c6 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -66,6 +66,7 @@ import static org.apache.aurora.scheduler.mesos.TaskExecutors.SOME_OVERHEAD_EXEC
import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_CONFIG;
import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR;
import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosRangeResource;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -187,8 +188,9 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
@Test
public void testCreateFromPortsUnset() {
AssignedTask builder = TASK.newBuilder();
- builder.getTask().unsetRequestedPorts();
builder.unsetAssignedPorts();
+ builder.setTask(
+ resetPorts(ITaskConfig.build(builder.getTask()), ImmutableSet.of()).newBuilder());
IAssignedTask assignedTask = IAssignedTask.build(builder);
expect(tierManager.getTier(assignedTask.getTask())).andReturn(DEV_TIER);
taskFactory = new MesosTaskFactoryImpl(config, tierManager, SERVER_INFO);
@@ -353,8 +355,9 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
public void testPopulateDiscoveryInfoNoPort() {
config = new ExecutorSettings(THERMOS_CONFIG, true);
AssignedTask builder = TASK.newBuilder();
- builder.getTask().unsetRequestedPorts();
builder.unsetAssignedPorts();
+ builder.setTask(
+ resetPorts(ITaskConfig.build(builder.getTask()), ImmutableSet.of()).newBuilder());
IAssignedTask assignedTask = IAssignedTask.build(builder);
expect(tierManager.getTier(assignedTask.getTask())).andReturn(DEV_TIER);
taskFactory = new MesosTaskFactoryImpl(config, tierManager, SERVER_INFO);
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java b/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
index d5f2172..36c5c11 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
@@ -29,6 +29,9 @@ import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Resource;
import org.junit.Test;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
@@ -51,10 +54,10 @@ public class AcceptedOfferTest {
@Test
public void testReservedPredicates() {
- Protos.Resource withRole = makeScalar(CPUS.getMesosName(), TEST_ROLE, false, 1.0);
+ Protos.Resource withRole = mesosScalar(CPUS, TEST_ROLE, false, 1.0);
assertTrue(AcceptedOffer.RESERVED.apply(withRole));
assertFalse(AcceptedOffer.NOT_RESERVED.apply(withRole));
- Protos.Resource absentRole = makeScalar(CPUS.getMesosName(), ABSENT_ROLE, false, 1.0);
+ Protos.Resource absentRole = mesosScalar(CPUS, ABSENT_ROLE, false, 1.0);
assertFalse(AcceptedOffer.RESERVED.apply(absentRole));
assertTrue(AcceptedOffer.NOT_RESERVED.apply(absentRole));
}
@@ -62,7 +65,7 @@ public class AcceptedOfferTest {
@Test
public void testAllocateEmpty() {
AcceptedOffer acceptedOffer = AcceptedOffer.create(
- fakeOffer(Collections.emptyList()),
+ offer(),
ResourceSlot.NONE,
ResourceSlot.NONE,
ImmutableSet.of(),
@@ -73,12 +76,10 @@ public class AcceptedOfferTest {
@Test
public void testAllocateRange() {
- List<Resource> resources = ImmutableList.<Resource>builder()
- .add(makePortResource(Optional.absent(), 80, 81, 90, 91, 92, 93))
- .add(makePortResource(TEST_ROLE, 100, 101))
- .build();
AcceptedOffer acceptedOffer = AcceptedOffer.create(
- fakeOffer(resources),
+ offer(
+ mesosRange(PORTS, Optional.absent(), 80, 81, 90, 91, 92, 93),
+ mesosRange(PORTS, TEST_ROLE, 100, 101)),
ResourceSlot.NONE,
ResourceSlot.NONE,
ImmutableSet.of(80, 90, 100),
@@ -87,8 +88,8 @@ public class AcceptedOfferTest {
List<Resource> expected = ImmutableList.<Resource>builder()
// Because we prefer reserved resources and handle them before non-reserved resources,
// result should have ports for the reserved resources first.
- .add(makePortResource(TEST_ROLE, 100))
- .add(makePortResource(Optional.absent(), 80, 90))
+ .add(mesosRange(PORTS, TEST_ROLE, 100))
+ .add(mesosRange(PORTS, Optional.absent(), 80, 90))
.build();
assertEquals(expected, acceptedOffer.getTaskResources());
assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources());
@@ -96,11 +97,10 @@ public class AcceptedOfferTest {
@Test(expected = Resources.InsufficientResourcesException.class)
public void testAllocateRangeInsufficent() {
- List<Resource> resources = ImmutableList.of(
- makePortResource(ABSENT_ROLE, 80),
- makePortResource(ABSENT_ROLE, 100, 101));
AcceptedOffer.create(
- fakeOffer(resources),
+ offer(
+ mesosRange(PORTS, ABSENT_ROLE, 80),
+ mesosRange(PORTS, ABSENT_ROLE, 100, 101)),
ResourceSlot.NONE,
ResourceSlot.NONE,
ImmutableSet.of(80, 90, 100),
@@ -116,52 +116,43 @@ public class AcceptedOfferTest {
}
private void runAllocateSingleRole(Optional<String> role, boolean cpuRevocable) {
- List<Resource> resources = ImmutableList.<Resource>builder()
- .add(makeScalar(
- CPUS.getMesosName(), role, cpuRevocable, TOTAL_SLOT.getNumCpus()))
- .add(makeScalar(
- RAM_MB.getMesosName(), role, false, TOTAL_SLOT.getRam().as(Data.MB)))
- .add(makeScalar(
- DISK_MB.getMesosName(), role, false, TOTAL_SLOT.getDisk().as(Data.MB)))
- .add(makePortResource(role, TASK_PORTS))
- .build();
- Protos.Offer offer = fakeOffer(resources);
+ Protos.Offer offer = offer(
+ mesosScalar(CPUS, role, cpuRevocable, TOTAL_SLOT.getNumCpus()),
+ mesosScalar(RAM_MB, role, false, TOTAL_SLOT.getRam().as(Data.MB)),
+ mesosScalar(DISK_MB, role, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+ mesosRange(PORTS, role, TASK_PORTS));
AcceptedOffer offerAllocation = AcceptedOffer.create(
offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, cpuRevocable));
List<Resource> taskList = ImmutableList.<Resource>builder()
- .add(makeScalar(CPUS.getMesosName(), role, cpuRevocable, TASK_SLOT.getNumCpus()))
- .add(makeScalar(RAM_MB.getMesosName(), role, false, TASK_SLOT.getRam().as(Data.MB)))
- .add(makeScalar(
- DISK_MB.getMesosName(), role, false, TASK_SLOT.getDisk().as(Data.MB)))
- .add(makePortResource(role, TASK_PORTS))
+ .add(mesosScalar(CPUS, role, cpuRevocable, TASK_SLOT.getNumCpus()))
+ .add(mesosScalar(RAM_MB, role, false, TASK_SLOT.getRam().as(Data.MB)))
+ .add(mesosScalar(
+ DISK_MB, role, false, TASK_SLOT.getDisk().as(Data.MB)))
+ .add(mesosRange(PORTS, role, TASK_PORTS))
.build();
assertEquals(taskList, offerAllocation.getTaskResources());
List<Resource> executorList = ImmutableList.<Resource>builder()
- .add(makeScalar(
- CPUS.getMesosName(), role, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
- .add(makeScalar(
- RAM_MB.getMesosName(), role, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
- .add(makeScalar(
- DISK_MB.getMesosName(), role, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+ .add(mesosScalar(
+ CPUS, role, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+ .add(mesosScalar(
+ RAM_MB, role, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
+ .add(mesosScalar(
+ DISK_MB, role, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
.build();
assertEquals(executorList, offerAllocation.getExecutorResources());
}
@Test(expected = Resources.InsufficientResourcesException.class)
public void testAllocateSingleRoleInsufficient() {
- List<Resource> resources = ImmutableList.<Resource>builder()
+ Protos.Offer offer = offer(
// EXECUTOR_SLOT's CPU is not included here.
- .add(makeScalar(CPUS.getMesosName(), TEST_ROLE, false, TASK_SLOT.getNumCpus()))
- .add(makeScalar(
- RAM_MB.getMesosName(), TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)))
- .add(makeScalar(
- DISK_MB.getMesosName(), TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)))
- .add(makePortResource(TEST_ROLE, TASK_PORTS))
- .build();
- Protos.Offer offer = fakeOffer(resources);
+ mesosScalar(CPUS, TEST_ROLE, false, TASK_SLOT.getNumCpus()),
+ mesosScalar(RAM_MB, TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
+ mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+ mesosRange(PORTS, TEST_ROLE, TASK_PORTS));
AcceptedOffer.create(
offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, false));
@@ -174,136 +165,64 @@ public class AcceptedOfferTest {
}
private void runMultipleRoles(boolean cpuRevocable) {
- List<Resource> resources = ImmutableList.<Resource>builder()
+ Protos.Offer offer = offer(
// Make cpus come from two roles.
- .add(makeScalar(
- CPUS.getMesosName(),
- TEST_ROLE,
- cpuRevocable,
- EXECUTOR_SLOT.getNumCpus()))
- .add(makeScalar(
- CPUS.getMesosName(),
- ABSENT_ROLE,
- cpuRevocable,
- TASK_SLOT.getNumCpus()))
+ mesosScalar(CPUS, TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()),
+ mesosScalar(CPUS, ABSENT_ROLE, cpuRevocable, TASK_SLOT.getNumCpus()),
// Make ram come from default role
- .add(makeScalar(
- RAM_MB.getMesosName(),
- ABSENT_ROLE,
- false,
- TOTAL_SLOT.getRam().as(Data.MB)))
+ mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
// Make disk come from non-default role.
- .add(makeScalar(
- DISK_MB.getMesosName(),
- TEST_ROLE,
- false,
- TOTAL_SLOT.getDisk().as(Data.MB)))
- .add(makePortResource(TEST_ROLE, 80))
- .add(makePortResource(ABSENT_ROLE, 90))
- .build();
-
- Protos.Offer offer = fakeOffer(resources);
+ mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+ mesosRange(PORTS, TEST_ROLE, 80),
+ mesosRange(PORTS, ABSENT_ROLE, 90));
AcceptedOffer offerAllocation = AcceptedOffer.create(
offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, cpuRevocable));
List<Resource> taskList = ImmutableList.<Resource>builder()
// We intentionally sliced the offer resource to not align with TASK_SLOT's num cpus.
- .add(makeScalar(
- CPUS.getMesosName(), TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
- .add(makeScalar(
- CPUS.getMesosName(),
+ .add(mesosScalar(
+ CPUS, TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+ .add(mesosScalar(
+ CPUS,
ABSENT_ROLE,
cpuRevocable,
TASK_SLOT.subtract(EXECUTOR_SLOT).getNumCpus()))
- .add(makeScalar(
- RAM_MB.getMesosName(), ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB)))
- .add(makeScalar(
- DISK_MB.getMesosName(), TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB)))
- .add(makePortResource(TEST_ROLE, 80))
- .add(makePortResource(ABSENT_ROLE, 90))
+ .add(mesosScalar(
+ RAM_MB, ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB)))
+ .add(mesosScalar(
+ DISK_MB, TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB)))
+ .add(mesosRange(PORTS, TEST_ROLE, 80))
+ .add(mesosRange(PORTS, ABSENT_ROLE, 90))
.build();
assertEquals(taskList, offerAllocation.getTaskResources());
List<Resource> executorList = ImmutableList.<Resource>builder()
- .add(makeScalar(
- CPUS.getMesosName(), ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
- .add(makeScalar(
- RAM_MB.getMesosName(), ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
- .add(makeScalar(
- DISK_MB.getMesosName(), TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+ .add(mesosScalar(
+ CPUS, ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+ .add(mesosScalar(
+ RAM_MB, ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
+ .add(mesosScalar(
+ DISK_MB, TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
.build();
assertEquals(executorList, offerAllocation.getExecutorResources());
}
@Test(expected = Resources.InsufficientResourcesException.class)
public void testMultipleRolesInsufficient() {
- // Similar to testMultipleRoles, but make some of cpus as revocable
- List<Resource> resources = ImmutableList.<Resource>builder()
+ Protos.Offer offer = offer(
+ // Similar to testMultipleRoles, but make some of cpus as revocable
// Make cpus come from two roles.
- .add(makeScalar(
- CPUS.getMesosName(),
- TEST_ROLE,
- true,
- EXECUTOR_SLOT.getNumCpus()))
- .add(makeScalar(
- CPUS.getMesosName(),
- ABSENT_ROLE,
- false,
- TASK_SLOT.getNumCpus()))
+ mesosScalar(CPUS, TEST_ROLE, true, EXECUTOR_SLOT.getNumCpus()),
+ mesosScalar(CPUS, ABSENT_ROLE, false, TASK_SLOT.getNumCpus()),
// Make ram come from default role
- .add(makeScalar(
- RAM_MB.getMesosName(),
- ABSENT_ROLE,
- false,
- TOTAL_SLOT.getRam().as(Data.MB)))
+ mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
// Make disk come from non-default role.
- .add(makeScalar(
- DISK_MB.getMesosName(),
- TEST_ROLE,
- false,
- TOTAL_SLOT.getDisk().as(Data.MB)))
- .add(makePortResource(TEST_ROLE, 80))
- .add(makePortResource(ABSENT_ROLE, 90))
- .build();
- Protos.Offer offer = fakeOffer(resources);
+ mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+ mesosRange(PORTS, TEST_ROLE, 80),
+ mesosRange(PORTS, ABSENT_ROLE, 90));
// We don't have enough resource to satisfy a non-revocable request.
AcceptedOffer.create(
offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, false));
}
-
- private static Resource makePortResource(Optional<String> role, Integer... values) {
- Resource.Builder prototype = Resource.newBuilder()
- .setType(Protos.Value.Type.RANGES)
- .setName(PORTS.getMesosName());
- if (role.isPresent()) {
- prototype.setRole(role.get());
- }
- return AcceptedOffer.makeMesosRangeResource(prototype.build(), ImmutableSet.copyOf(values));
- }
-
- private static Resource makeScalar(
- String name, Optional<String> role, boolean revocable, double value) {
- Resource.Builder resource = Resource.newBuilder()
- .setName(name)
- .setType(Protos.Value.Type.SCALAR)
- .setScalar(Protos.Value.Scalar.newBuilder().setValue(value));
- if (role.isPresent()) {
- resource.setRole(role.get());
- }
- if (revocable) {
- resource.setRevocable(Resource.RevocableInfo.getDefaultInstance());
- }
- return resource.build();
- }
-
- private static Protos.Offer fakeOffer(List<Resource> resources) {
- return Protos.Offer.newBuilder()
- .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
- .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
- .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
- .setHostname("hostname")
- .addAllResources(resources)
- .build();
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java b/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java
new file mode 100644
index 0000000..c94f7a9
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.resources.ResourceMapper.PORT_MAPPER;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.junit.Assert.assertEquals;
+
+public class PortMapperTest {
+ @Test
+ public void testAssignNoPorts() {
+ ScheduledTask builder = makeTask("id", JOB).newBuilder();
+ builder.getAssignedTask().getTask().unsetResources();
+ builder.getAssignedTask().unsetAssignedPorts();
+ IScheduledTask task = IScheduledTask.build(builder);
+
+ assertEquals(task, PORT_MAPPER.mapAndAssign(offer(), task));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testPortRangeScarcity() {
+ PORT_MAPPER.mapAndAssign(offer(), makeTask("id", JOB));
+ }
+
+ @Test
+ public void testPortRangeAbundance() {
+ Protos.Offer offer = offer(mesosRange(PORTS, 1, 2, 3, 4, 5));
+ assertEquals(
+ 1,
+ PORT_MAPPER.mapAndAssign(offer, makeTask("id", JOB))
+ .getAssignedTask().getAssignedPorts().size());
+ }
+
+ @Test
+ public void testPortRangeExact() {
+ Protos.Offer offer = offer(mesosRange(PORTS, 1));
+ assertEquals(
+ 1,
+ PORT_MAPPER.mapAndAssign(offer, makeTask("id", JOB))
+ .getAssignedTask().getAssignedPorts().size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
new file mode 100644
index 0000000..b6810b1
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import java.util.EnumSet;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Value.Scalar;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.namedPort;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static org.apache.mesos.Protos.Value.Type.SCALAR;
+import static org.junit.Assert.assertEquals;
+
+public class ResourceManagerTest {
+ @Test
+ public void testGetOfferResources() {
+ Protos.Resource resource1 = Protos.Resource.newBuilder()
+ .setType(SCALAR)
+ .setName(CPUS.getMesosName())
+ .setScalar(Scalar.newBuilder().setValue(2.0).build())
+ .build();
+
+ Protos.Resource resource2 = Protos.Resource.newBuilder()
+ .setType(SCALAR)
+ .setName(RAM_MB.getMesosName())
+ .setScalar(Scalar.newBuilder().setValue(64).build())
+ .build();
+
+ Offer offer = Offer.newBuilder()
+ .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
+ .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
+ .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
+ .setHostname("hostname")
+ .addAllResources(ImmutableSet.of(resource1, resource2)).build();
+
+ assertEquals(
+ resource1,
+ Iterables.getOnlyElement(ResourceManager.getOfferResources(offer, CPUS)));
+ assertEquals(
+ resource2,
+ Iterables.getOnlyElement(ResourceManager.getOfferResources(offer, RAM_MB)));
+ }
+
+ @Test
+ public void testGetTaskResources() {
+ assertEquals(
+ IResource.build(numCpus(1.0)),
+ Iterables.getOnlyElement(ResourceManager.getTaskResources(makeTask("id", JOB), CPUS)));
+ assertEquals(
+ IResource.build(namedPort("http")),
+ Iterables.getOnlyElement(ResourceManager.getTaskResources(makeTask("id", JOB), PORTS)));
+ }
+
+ @Test
+ public void testGetTaskResourceTypes() {
+ ScheduledTask builder = makeTask("id", JOB).newBuilder();
+ builder.getAssignedTask().getTask().addToResources(namedPort("health"));
+
+ assertEquals(
+ EnumSet.allOf(ResourceType.class),
+ ResourceManager.getTaskResourceTypes(IScheduledTask.build(builder)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
index 04a8238..1583cef 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
@@ -13,14 +13,26 @@
*/
package org.apache.aurora.scheduler.resources;
+import java.util.Set;
+
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.aurora.gen.Resource;
import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.Numbers;
+import org.apache.aurora.scheduler.storage.entities.IResource;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos;
import static org.apache.aurora.gen.Resource.diskMb;
import static org.apache.aurora.gen.Resource.numCpus;
import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.fromResource;
/**
* Convenience methods for working with resources.
@@ -45,4 +57,77 @@ public final class ResourceTestUtil {
.setRamMb(ramMb)
.setDiskMb(diskMb));
}
+
+ public static ITaskConfig resetPorts(ITaskConfig config, Set<String> portNames) {
+ TaskConfig builder = config.newBuilder()
+ .setRequestedPorts(portNames);
+ builder.getResources().removeIf(e -> fromResource(IResource.build(e)).equals(PORTS));
+ portNames.forEach(e -> builder.addToResources(Resource.namedPort(e)));
+ return ITaskConfig.build(builder);
+ }
+
+ public static Protos.Resource mesosScalar(ResourceType type, double value) {
+ return mesosScalar(type, Optional.absent(), false, value);
+ }
+
+ public static Protos.Resource mesosScalar(ResourceType type, double value, boolean revocable) {
+ return mesosScalar(type, Optional.absent(), revocable, value);
+ }
+
+ public static Protos.Resource mesosScalar(
+ ResourceType type,
+ Optional<String> role,
+ boolean revocable,
+ double value) {
+
+ return resourceBuilder(type, role, revocable)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(value).build())
+ .build();
+ }
+
+ public static Protos.Resource mesosRange(ResourceType type, Integer... values) {
+ return mesosRange(type, Optional.absent(), values);
+ }
+
+ public static Protos.Resource mesosRange(
+ ResourceType type,
+ Optional<String> role,
+ Integer... values) {
+
+ return resourceBuilder(type, role, false)
+ .setRanges(Protos.Value.Ranges.newBuilder().addAllRange(
+ Iterables.transform(
+ Numbers.toRanges(ImmutableSet.copyOf(values)),
+ ResourceSlot.RANGE_TRANSFORM)))
+ .build();
+ }
+
+ public static Protos.Offer offer(Protos.Resource... resources) {
+ return Protos.Offer.newBuilder()
+ .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
+ .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
+ .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
+ .setHostname("hostname")
+ .addAllResources(ImmutableSet.copyOf(resources)).build();
+ }
+
+ private static Protos.Resource.Builder resourceBuilder(
+ ResourceType type,
+ Optional<String> role,
+ boolean revocable) {
+
+ Protos.Resource.Builder builder = Protos.Resource.newBuilder()
+ .setType(type.getMesosType())
+ .setName(type.getMesosName());
+
+ if (revocable) {
+ builder.setRevocable(Protos.Resource.RevocableInfo.getDefaultInstance());
+ }
+
+ if (role.isPresent()) {
+ builder.setRole(role.get());
+ }
+
+ return builder;
+ }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
index ceb81e4..185338e 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
@@ -13,164 +13,64 @@
*/
package org.apache.aurora.scheduler.resources;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.collections.Pair;
import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.scheduler.resources.Resources.InsufficientResourcesException;
import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.Value.Range;
-import org.apache.mesos.Protos.Value.Ranges;
import org.junit.Test;
import static org.apache.aurora.common.quantity.Data.MB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
-import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosResource;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
-import static org.apache.mesos.Protos.Value.Type.RANGES;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
public class ResourcesTest {
@Test
- public void testPortRangeExact() {
- Resource portsResource = createPortRange(Pair.of(1, 5));
- Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5);
- assertEquals(5, ports.size());
- }
-
- @Test
- public void testOnePortAvailable() {
- Resource portsResource = createPortRange(Pair.of(3, 3));
- Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(1);
- assertEquals(1, ports.size());
- }
-
- @Test
- public void testPortRangeAbundance() {
- Resource portsResource = createPortRange(Pair.of(1, 10));
- Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5);
- assertEquals(5, ports.size());
- }
-
- @Test
- public void testPortRangeExhaust() {
- Resource portsResource = createPortRanges(Pair.of(1, 2), Pair.of(10, 15));
-
- Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(7);
- assertEquals(7, ports.size());
-
- ports = Resources.from(createOffer(portsResource)).getPorts(8);
- assertEquals(8, ports.size());
-
- try {
- Resources.from(createOffer(portsResource)).getPorts(9);
- fail("Ports should not have been sufficient");
- } catch (InsufficientResourcesException e) {
- // Expected.
- }
- }
-
- @Test
- public void testGetNoPorts() {
- Resource portsResource = createPortRange(Pair.of(1, 5));
- assertEquals(ImmutableSet.of(), Resources.from(createOffer(portsResource)).getPorts(0));
- }
-
- @Test(expected = Resources.InsufficientResourcesException.class)
- public void testPortRangeScarcity() {
- Resource portsResource = createPortRange(Pair.of(1, 2));
- Resources.from(createOffer(portsResource)).getPorts(5);
- }
-
- @Test
public void testGetSlot() {
- ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
- .add(makeMesosResource(CPUS, 8.0, false))
- .add(makeMesosResource(RAM_MB, 1024, false))
- .add(makeMesosResource(DISK_MB, 2048, false))
- .add(createPortRange(Pair.of(1, 10)))
- .build();
-
- ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 10);
- assertEquals(expected, Resources.from(createOffer(resources)).slot());
+ Protos.Offer offer = offer(
+ mesosScalar(CPUS, 8.0, false),
+ mesosScalar(RAM_MB, 1024, false),
+ mesosScalar(DISK_MB, 2048, false),
+ mesosRange(PORTS, 1, 2, 3));
+
+ ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 3);
+ assertEquals(expected, Resources.from(offer).slot());
}
@Test
public void testMissingResourcesHandledGracefully() {
- ImmutableList<Resource> resources = ImmutableList.<Resource>builder().build();
- assertEquals(ResourceSlot.NONE, Resources.from(createOffer(resources)).slot());
+ assertEquals(ResourceSlot.NONE, Resources.from(offer()).slot());
}
@Test
public void testFilter() {
- ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
- .add(makeMesosResource(CPUS, 8.0, true))
- .add(makeMesosResource(RAM_MB, 1024, false))
- .build();
+ Protos.Offer offer = offer(
+ mesosScalar(CPUS, 8.0, true),
+ mesosScalar(RAM_MB, 1024, false));
assertEquals(
new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
- Resources.from(createOffer(resources)).filter(Resources.REVOCABLE).slot());
+ Resources.from(offer).filter(Resources.REVOCABLE).slot());
}
@Test
public void testFilterByTier() {
- ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
- .add(makeMesosResource(CPUS, 8.0, true))
- .add(makeMesosResource(CPUS, 8.0, false))
- .add(makeMesosResource(RAM_MB, 1024, false))
- .build();
+ Protos.Offer offer = offer(
+ mesosScalar(CPUS, 8.0, true),
+ mesosScalar(CPUS, 8.0, false),
+ mesosScalar(RAM_MB, 1024, false));
assertEquals(
new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
- Resources.from(createOffer(resources)).filter(REVOCABLE_TIER).slot());
+ Resources.from(offer).filter(REVOCABLE_TIER).slot());
assertEquals(
new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
- Resources.from(createOffer(resources)).filter(DEV_TIER).slot());
- }
-
- private Resource createPortRange(Pair<Integer, Integer> range) {
- return createPortRanges(ImmutableSet.of(range));
- }
-
- private Resource createPortRanges(Pair<Integer, Integer> rangeA, Pair<Integer, Integer> rangeB) {
- return createPortRanges(
- ImmutableSet.<Pair<Integer, Integer>>builder().add(rangeA).add(rangeB).build());
- }
-
- private Resource createPortRanges(Set<Pair<Integer, Integer>> ports) {
- Ranges.Builder ranges = Ranges.newBuilder();
- for (Pair<Integer, Integer> range : ports) {
- ranges.addRange(Range.newBuilder().setBegin(range.getFirst()).setEnd(range.getSecond()));
- }
-
- return Resource.newBuilder()
- .setName(PORTS.getMesosName())
- .setType(RANGES)
- .setRanges(ranges)
- .build();
- }
-
- private static Protos.Offer createOffer(Resource resource) {
- return createOffer(ImmutableList.of(resource));
- }
-
- private static Protos.Offer createOffer(Iterable<Resource> resources) {
- return Protos.Offer.newBuilder()
- .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
- .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
- .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
- .setHostname("hostname")
- .addAllResources(resources).build();
+ Resources.from(offer).filter(DEV_TIER).slot());
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 498da78..2370178 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -17,6 +17,8 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -44,6 +46,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -69,6 +72,8 @@ import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
import static org.apache.aurora.scheduler.state.StateChangeResult.ILLEGAL;
import static org.apache.aurora.scheduler.state.StateChangeResult.INVALID_CAS_STATE;
import static org.apache.aurora.scheduler.state.StateChangeResult.NOOP;
@@ -403,14 +408,10 @@ public class StateManagerImplTest extends EasyMockTest {
storeProvider -> stateManager.deleteTasks(storeProvider, ImmutableSet.of(taskId)));
}
- private static ITaskConfig setRequestedPorts(ITaskConfig config, Set<String> portNames) {
- return ITaskConfig.build(config.newBuilder().setRequestedPorts(portNames));
- }
-
@Test
public void testPortResource() throws Exception {
Set<String> requestedPorts = ImmutableSet.of("one", "two", "three");
- ITaskConfig task = setRequestedPorts(NON_SERVICE_CONFIG, requestedPorts);
+ ITaskConfig task = resetPorts(NON_SERVICE_CONFIG, requestedPorts);
String taskId = "a";
expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
@@ -425,13 +426,15 @@ public class StateManagerImplTest extends EasyMockTest {
assertEquals(
requestedPorts,
- actual.getAssignedTask().getTask().getRequestedPorts());
+ StreamSupport.stream(ResourceManager.getTaskResources(actual, PORTS).spliterator(), false)
+ .map(e -> e.getNamedPort())
+ .collect(Collectors.toSet()));
}
@Test
public void testPortResourceResetAfterReschedule() throws Exception {
Set<String> requestedPorts = ImmutableSet.of("one");
- ITaskConfig task = setRequestedPorts(NON_SERVICE_CONFIG, requestedPorts);
+ ITaskConfig task = resetPorts(NON_SERVICE_CONFIG, requestedPorts);
String taskId = "a";
expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
@@ -561,6 +564,10 @@ public class StateManagerImplTest extends EasyMockTest {
taskId,
host.getHost(),
SlaveID.newBuilder().setValue(host.getSlaveId()).build(),
- ports));
+ e -> {
+ ScheduledTask builder = e.newBuilder();
+ builder.getAssignedTask().setAssignedPorts(ports);
+ return IScheduledTask.build(builder);
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index a2df311..ca10323 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -20,8 +20,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ExecutorConfig;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.ScheduledTask;
@@ -56,41 +54,31 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LAUNCH_FAILED_MSG;
import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.mesos.Protos.Offer;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TaskAssignerImplTest extends EasyMockTest {
- private static final int PORT = 5000;
- private static final String SLAVE_ID = "slaveId";
- private static final Offer MESOS_OFFER = Offer.newBuilder()
- .setId(OfferID.newBuilder().setValue("offerId"))
- .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
- .setSlaveId(SlaveID.newBuilder().setValue(SLAVE_ID))
- .setHostname("hostName")
- .addResources(Resource.newBuilder()
- .setName("ports")
- .setType(Type.RANGES)
- .setRanges(
- Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
- .build();
+ private static final int PORT = 1000;
+ private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
+ private static final String SLAVE_ID = MESOS_OFFER.getSlaveId().getValue();
private static final HostOffer OFFER =
new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
- private static final String PORT_NAME = "http";
- private static final IScheduledTask TASK = IScheduledTask.build(
- new ScheduledTask()
- .setAssignedTask(new AssignedTask()
- .setTaskId("taskId")
- .setTask(new TaskConfig()
- .setJob(new JobKey("r", "e", "n"))
- .setExecutorConfig(new ExecutorConfig().setData("opaque data"))
- .setRequestedPorts(ImmutableSet.of(PORT_NAME)))));
+ private static final IScheduledTask TASK = makeTask("id", JOB);
private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
.setName("taskName")
@@ -108,7 +96,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
private SchedulingFilter filter;
private MesosTaskFactory taskFactory;
private OfferManager offerManager;
- private TaskAssigner assigner;
+ private TaskAssignerImpl assigner;
private TierManager tierManager;
@Before
@@ -128,13 +116,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
- expect(stateManager.assignTask(
- storeProvider,
- Tasks.id(TASK),
- MESOS_OFFER.getHostname(),
- MESOS_OFFER.getSlaveId(),
- ImmutableMap.of(PORT_NAME, PORT)))
- .andReturn(TASK.getAssignedTask());
+ expectAssignTask(MESOS_OFFER);
expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
.andReturn(TASK_INFO);
@@ -190,13 +172,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
- expect(stateManager.assignTask(
- storeProvider,
- Tasks.id(TASK),
- MESOS_OFFER.getHostname(),
- MESOS_OFFER.getSlaveId(),
- ImmutableMap.of(PORT_NAME, PORT)))
- .andReturn(TASK.getAssignedTask());
+ expectAssignTask(MESOS_OFFER);
expect(stateManager.changeState(
storeProvider,
Tasks.id(TASK),
@@ -254,13 +230,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER));
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
- expect(stateManager.assignTask(
- storeProvider,
- Tasks.id(TASK),
- offer.getOffer().getHostname(),
- offer.getOffer().getSlaveId(),
- ImmutableMap.of(PORT_NAME, PORT)))
- .andReturn(TASK.getAssignedTask());
+ expectAssignTask(offer.getOffer());
expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer()))
.andReturn(TASK_INFO);
offerManager.launchTask(offer.getOffer().getId(), TASK_INFO);
@@ -306,13 +276,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
.andReturn(ImmutableSet.of());
- expect(stateManager.assignTask(
- storeProvider,
- Tasks.id(TASK),
- OFFER.getOffer().getHostname(),
- OFFER.getOffer().getSlaveId(),
- ImmutableMap.of(PORT_NAME, PORT)))
- .andReturn(TASK.getAssignedTask());
+ expectAssignTask(MESOS_OFFER);
expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer()))
.andReturn(TASK_INFO);
offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);
@@ -326,4 +290,23 @@ public class TaskAssignerImplTest extends EasyMockTest {
Tasks.id(TASK),
ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
}
+
+ @Test
+ public void testResourceMapperCallback() {
+ ScheduledTask builder = TASK.newBuilder();
+ builder.getAssignedTask().unsetAssignedPorts();
+
+ control.replay();
+
+ assertEquals(TASK, assigner.mapAndAssignResources(MESOS_OFFER, IScheduledTask.build(builder)));
+ }
+
+ private void expectAssignTask(Offer offer) {
+ expect(stateManager.assignTask(
+ eq(storeProvider),
+ eq(Tasks.id(TASK)),
+ eq(offer.getHostname()),
+ eq(offer.getSlaveId()),
+ anyObject())).andReturn(TASK.getAssignedTask());
+ }
}