You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/05/02 18:05:01 UTC

aurora git commit: Generalizing port resource management.

Repository: aurora
Updated Branches:
  refs/heads/master 450d88156 -> d8d1c8d68


Generalizing port resource management.

Reviewed at https://reviews.apache.org/r/46810/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d8d1c8d6
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d8d1c8d6
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d8d1c8d6

Branch: refs/heads/master
Commit: d8d1c8d6889b39c5da0ca95496ef8bda7827fb4a
Parents: 450d881
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Mon May 2 09:04:54 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Mon May 2 09:04:54 2016 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationManager.java     |  19 +-
 .../scheduler/filter/SchedulingFilter.java      |   4 -
 .../scheduler/resources/ResourceManager.java    |  81 +++++++
 .../scheduler/resources/ResourceMapper.java     |  84 ++++++++
 .../scheduler/resources/ResourceSlot.java       |   3 +-
 .../scheduler/resources/ResourceType.java       |  43 +++-
 .../aurora/scheduler/resources/Resources.java   |  31 ---
 .../aurora/scheduler/state/StateManager.java    |   7 +-
 .../scheduler/state/StateManagerImpl.java       |  13 +-
 .../aurora/scheduler/state/TaskAssigner.java    |  28 +--
 .../configuration/ConfigurationManagerTest.java |  11 +
 .../filter/SchedulingFilterImplTest.java        |  25 +--
 .../mesos/MesosTaskFactoryImplTest.java         |   7 +-
 .../scheduler/resources/AcceptedOfferTest.java  | 213 ++++++-------------
 .../scheduler/resources/PortMapperTest.java     |  62 ++++++
 .../resources/ResourceManagerTest.java          |  88 ++++++++
 .../scheduler/resources/ResourceTestUtil.java   |  85 ++++++++
 .../scheduler/resources/ResourcesTest.java      | 144 ++-----------
 .../scheduler/state/StateManagerImplTest.java   |  23 +-
 .../scheduler/state/TaskAssignerImplTest.java   |  89 ++++----
 20 files changed, 645 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 9a15a4b..e1ce638 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -38,10 +38,12 @@ import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.UserProvidedStrings;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IContainer;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IResource;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
 import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
@@ -49,6 +51,8 @@ import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+
 /**
  * Manages translation from a string-mapped configuration to a concrete configuration type, and
  * defaults for optional values.
@@ -224,8 +228,6 @@ public class ConfigurationManager {
       builder.setRequestedPorts(ImmutableSet.of());
     }
 
-    maybeFillLinks(builder);
-
     if (config.isSetTier() && !UserProvidedStrings.isGoodIdentifier(config.getTier())) {
       throw new TaskDescriptionException("Tier contains illegal characters: " + config.getTier());
     }
@@ -327,6 +329,8 @@ public class ConfigurationManager {
       throw new TaskDescriptionException("Multiple resource values are not supported for " + types);
     }
 
+    maybeFillLinks(builder);
+
     return ITaskConfig.build(builder);
   }
 
@@ -343,11 +347,12 @@ public class ConfigurationManager {
   private static void maybeFillLinks(TaskConfig task) {
     if (task.getTaskLinksSize() == 0) {
       ImmutableMap.Builder<String, String> links = ImmutableMap.builder();
-      if (task.getRequestedPorts().contains("health")) {
-        links.put("health", "http://%host%:%port:health%");
-      }
-      if (task.getRequestedPorts().contains("http")) {
-        links.put("http", "http://%host%:%port:http%");
+      for (IResource resource : ResourceManager.getTaskResources(ITaskConfig.build(task), PORTS)) {
+        if (resource.getNamedPort().equals("health")) {
+          links.put("health", "http://%host%:%port:health%");
+        } else if (resource.getNamedPort().equals("http")) {
+          links.put("http", "http://%host%:%port:http%");
+        }
       }
       task.setTaskLinks(links.build());
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 625e6d5..1ee2cfa 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -303,10 +303,6 @@ public interface SchedulingFilter {
       return jobState;
     }
 
-    public Set<String> getRequestedPorts() {
-      return task.getRequestedPorts();
-    }
-
     @Override
     public boolean equals(Object o) {
       if (!(o instanceof ResourceRequest)) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
new file mode 100644
index 0000000..8b42bf0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos.Resource;
+
+import static org.apache.mesos.Protos.Offer;
+
+/**
+ * Manages resources and provides Aurora/Mesos translation.
+ */
+public final class ResourceManager {
+  private ResourceManager() {
+    // Utility class.
+  }
+
+  /**
+   * Gets offer resources matching specified {@link ResourceType}.
+   *
+   * @param offer Offer to get resources from.
+   * @param type {@link ResourceType} to filter resources by.
+   * @return Offer resources matching {@link ResourceType}.
+   */
+  public static Iterable<Resource> getOfferResources(Offer offer, ResourceType type) {
+    return Iterables.filter(offer.getResourcesList(), r -> r.getName().equals(type.getMesosName()));
+  }
+
+  /**
+   * Same as {@link #getTaskResources(ITaskConfig, ResourceType)}.
+   *
+   * @param task Scheduled task to get resources from.
+   * @param type {@link ResourceType} to filter resources by.
+   * @return Task resources matching {@link ResourceType}.
+   */
+  public static Iterable<IResource> getTaskResources(IScheduledTask task, ResourceType type) {
+    return getTaskResources(task.getAssignedTask().getTask(), type);
+  }
+
+  /**
+   * Gets task resources matching specified {@link ResourceType}.
+   *
+   * @param task Task config to get resources from.
+   * @param type {@link ResourceType} to filter resources by.
+   * @return Task resources matching {@link ResourceType}.
+   */
+  public static Iterable<IResource> getTaskResources(ITaskConfig task, ResourceType type) {
+    return Iterables.filter(task.getResources(), r -> ResourceType.fromResource(r).equals(type));
+  }
+
+  /**
+   * Gets unique task resource types.
+   *
+   * @param task Task to get resource types from.
+   * @return Set of {@link ResourceType} instances representing task resources.
+   */
+  public static Set<ResourceType> getTaskResourceTypes(IScheduledTask task) {
+    return EnumSet.copyOf(task.getAssignedTask().getTask().getResources().stream()
+        .map(r -> ResourceType.fromResource(r))
+        .collect(Collectors.toSet()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
new file mode 100644
index 0000000..c06ce8d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos.Offer;
+
+import static java.util.stream.StreamSupport.stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.DiscreteDomain.integers;
+
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+
+/**
+ * Maps requested (task) resources to available (offer) resources.
+ */
+public interface ResourceMapper {
+
+  /**
+   * Maps task resources to offer resources and returns a new task with updated mapping.
+   *
+   * @param offer Offer with available resources.
+   * @param task Task with requested resources.
+   * @return A new task with updated mapping.
+   */
+  IScheduledTask mapAndAssign(Offer offer, IScheduledTask task);
+
+  PortMapper PORT_MAPPER = new PortMapper();
+
+  class PortMapper implements ResourceMapper {
+    @Override
+    public IScheduledTask mapAndAssign(Offer offer, IScheduledTask task) {
+      List<Integer> availablePorts =
+          stream(ResourceManager.getOfferResources(offer, PORTS).spliterator(), false)
+              .flatMap(resource -> resource.getRanges().getRangeList().stream())
+              .flatMap(range -> ContiguousSet.create(
+                  Range.closed((int) range.getBegin(), (int) range.getEnd()),
+                  integers()).stream())
+              .collect(Collectors.toList());
+
+      Collections.shuffle(availablePorts);
+
+      List<String> requestedPorts =
+          stream(ResourceManager.getTaskResources(task, PORTS).spliterator(), false)
+            .map(e -> e.getNamedPort())
+            .collect(Collectors.toList());
+
+      checkState(
+          availablePorts.size() >= requestedPorts.size(),
+          String.format("Insufficient ports %d when matching %s", availablePorts.size(), task));
+
+      Iterator<Integer> ports = availablePorts.iterator();
+      Map<String, Integer> portMap =
+          requestedPorts.stream().collect(Collectors.toMap(key -> key, value -> ports.next()));
+
+      ScheduledTask builder = task.newBuilder();
+      builder.getAssignedTask().setAssignedPorts(ImmutableMap.copyOf(portMap));
+      return IScheduledTask.build(builder);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
index 1df2c11..a8dee95 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java
@@ -43,6 +43,7 @@ import static java.util.Objects.requireNonNull;
 import static org.apache.aurora.common.quantity.Data.BYTES;
 import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
 
 /**
@@ -94,7 +95,7 @@ public final class ResourceSlot {
         task.getNumCpus(),
         Amount.of(task.getRamMb(), Data.MB),
         Amount.of(task.getDiskMb(), Data.MB),
-        task.getRequestedPorts().size());
+        Iterables.size(ResourceManager.getTaskResources(task, PORTS)));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
index 6e4d694..ee2b51a 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.resources;
 
 import java.util.EnumSet;
+import java.util.Optional;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
@@ -29,6 +30,7 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.common.quantity.Data.GB;
 import static org.apache.aurora.common.quantity.Data.MB;
+import static org.apache.aurora.scheduler.resources.ResourceMapper.PORT_MAPPER;
 import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.DOUBLE;
 import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.LONG;
 import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.STRING;
@@ -43,22 +45,38 @@ public enum ResourceType implements TEnum {
   /**
    * CPU resource.
    */
-  CPUS(_Fields.NUM_CPUS, SCALAR, "cpus", DOUBLE, "CPU", 16, false),
+  CPUS(_Fields.NUM_CPUS, SCALAR, "cpus", DOUBLE, Optional.empty(), "CPU", 16, false),
 
   /**
    * RAM resource.
    */
-  RAM_MB(_Fields.RAM_MB, SCALAR, "mem", LONG, "RAM", Amount.of(24, GB).as(MB), false),
+  RAM_MB(
+      _Fields.RAM_MB,
+      SCALAR,
+      "mem",
+      LONG,
+      Optional.empty(),
+      "RAM",
+      Amount.of(24, GB).as(MB),
+      false),
 
   /**
    * DISK resource.
    */
-  DISK_MB(_Fields.DISK_MB, SCALAR, "disk", LONG, "disk", Amount.of(450, GB).as(MB), false),
+  DISK_MB(
+      _Fields.DISK_MB,
+      SCALAR,
+      "disk",
+      LONG,
+      Optional.empty(),
+      "disk",
+      Amount.of(450, GB).as(MB),
+      false),
 
   /**
    * Port resource.
    */
-  PORTS(_Fields.NAMED_PORT, RANGES, "ports", STRING, "ports", 1000, true);
+  PORTS(_Fields.NAMED_PORT, RANGES, "ports", STRING, Optional.of(PORT_MAPPER), "ports", 1000, true);
 
   /**
    * Correspondent thrift {@link org.apache.aurora.gen.Resource} enum value.
@@ -81,6 +99,11 @@ public enum ResourceType implements TEnum {
   private final ResourceTypeConverter<?> typeConverter;
 
   /**
+   * Optional resource mapper to use.
+   */
+  private final Optional<ResourceMapper> mapper;
+
+  /**
    * Aurora resource name.
    */
   private final String auroraName;
@@ -105,6 +128,7 @@ public enum ResourceType implements TEnum {
    * @param mesosType See {@link #getMesosType()} for more details.
    * @param mesosName See {@link #getMesosName()} for more details.
    * @param typeConverter See {@link #getTypeConverter()} for more details.
+   * @param mapper See {@link #getMapper()} for more details.
    * @param auroraName See {@link #getAuroraName()} for more details.
    * @param scalingRange See {@link #getScalingRange()} for more details.
    * @param isMultipleAllowed See {@link #isMultipleAllowed()} for more details.
@@ -114,6 +138,7 @@ public enum ResourceType implements TEnum {
       Protos.Value.Type mesosType,
       String mesosName,
       ResourceTypeConverter<?> typeConverter,
+      Optional<ResourceMapper> mapper,
       String auroraName,
       int scalingRange,
       boolean isMultipleAllowed) {
@@ -123,6 +148,7 @@ public enum ResourceType implements TEnum {
     this.mesosName = requireNonNull(mesosName);
     this.typeConverter = requireNonNull(typeConverter);
     this.auroraName = requireNonNull(auroraName);
+    this.mapper = requireNonNull(mapper);
     this.scalingRange = scalingRange;
     this.isMultipleAllowed = isMultipleAllowed;
   }
@@ -171,6 +197,15 @@ public enum ResourceType implements TEnum {
   }
 
   /**
+   * Gets optional resource mapper. See {@link ResourceMapper} for more details.
+   *
+   * @return Optional ResourceMapper.
+   */
+  public Optional<ResourceMapper> getMapper() {
+    return mapper;
+  }
+
+  /**
    * Gets resource name for internal Aurora representation (e.g. in the UI).
    *
    * @return Aurora resource name.

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
index 894c6a6..36d1de8 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.resources;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 
 import com.google.common.base.Function;
@@ -23,10 +21,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
@@ -119,32 +114,6 @@ public final class Resources {
         getNumAvailablePorts());
   }
 
-  /**
-   * Attempts to grab {@code numPorts} from this resource instance.
-   *
-   * @param numPorts The number of ports to grab.
-   * @return The set of ports grabbed.
-   * @throws InsufficientResourcesException if not enough ports were available.
-   */
-  public Set<Integer> getPorts(int numPorts)
-      throws InsufficientResourcesException {
-
-    if (numPorts == 0) {
-      return ImmutableSet.of();
-    }
-
-    List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(Iterables.concat(
-        Iterables.transform(getPortRanges(), RANGE_TO_MEMBERS))));
-
-    if (availablePorts.size() < numPorts) {
-      throw new InsufficientResourcesException(
-          String.format("Could not get %d ports from %s", numPorts, availablePorts));
-    }
-
-    Collections.shuffle(availablePorts);
-    return ImmutableSet.copyOf(availablePorts.subList(0, numPorts));
-  }
-
   private int getNumAvailablePorts() {
     int offeredPorts = 0;
     for (Range range : getPortRanges()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 5d34fe3..66bfd72 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -13,13 +13,14 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 import com.google.common.base.Optional;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
 
@@ -62,7 +63,7 @@ public interface StateManager {
    * @param taskId ID of the task to mutate.
    * @param slaveHost Host name that the task is being assigned to.
    * @param slaveId ID of the slave that the task is being assigned to.
-   * @param assignedPorts Ports on the host that are being assigned to the task.
+   * @param resourceAssigner The resource assign operation.
    * @return The updated task record, or {@code null} if the task was not found.
    */
   IAssignedTask assignTask(
@@ -70,7 +71,7 @@ public interface StateManager {
       String taskId,
       String slaveHost,
       SlaveID slaveId,
-      Map<String, Integer> assignedPorts);
+      Function<IScheduledTask, IScheduledTask> resourceAssigner);
 
   /**
    * Inserts pending instances using {@code task} as their configuration. Tasks will immediately

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index e5b2f41..2b4fac1 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -18,11 +18,11 @@ import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
@@ -165,18 +165,18 @@ public class StateManagerImpl implements StateManager {
       String taskId,
       String slaveHost,
       SlaveID slaveId,
-      Map<String, Integer> assignedPorts) {
+      Function<IScheduledTask, IScheduledTask> resourceAssigner) {
 
     checkNotBlank(taskId);
     checkNotBlank(slaveHost);
     requireNonNull(slaveId);
-    requireNonNull(assignedPorts);
+    requireNonNull(resourceAssigner);
 
     IScheduledTask mutated = storeProvider.getUnsafeTaskStore().mutateTask(taskId,
         task -> {
+          task = resourceAssigner.apply(task);
           ScheduledTask builder = task.newBuilder();
           builder.getAssignedTask()
-              .setAssignedPorts(assignedPorts)
               .setSlaveHost(slaveHost)
               .setSlaveId(slaveId.getValue());
           return IScheduledTask.build(builder);
@@ -231,9 +231,6 @@ public class StateManagerImpl implements StateManager {
         transitionMessage);
   }
 
-  private static final Function<SideEffect, Action> GET_ACTION =
-      SideEffect::getAction;
-
   private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
       Action.INCREMENT_FAILURES,
       Action.SAVE_STATE,
@@ -252,7 +249,7 @@ public class StateManagerImpl implements StateManager {
   // (thus losing the object to copy), or rescheduling a task before incrementing the failure count
   // (thus not carrying forward the failure increment).
   private static final Ordering<SideEffect> ACTION_ORDER =
-      Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
+      Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(SideEffect::getAction);
 
   private StateChangeResult updateTaskAndExternalState(
       TaskStore.Mutable taskStore,

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 7d43d4a..1a3886f 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -22,8 +21,6 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.stats.Stats;
@@ -38,8 +35,11 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.resources.Resources;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.TaskInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,26 +103,29 @@ public interface TaskAssigner {
       this.tierManager = requireNonNull(tierManager);
     }
 
+    @VisibleForTesting
+    IScheduledTask mapAndAssignResources(Offer offer, IScheduledTask task) {
+      IScheduledTask assigned = task;
+      for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) {
+        if (type.getMapper().isPresent()) {
+          assigned = type.getMapper().get().mapAndAssign(offer, assigned);
+        }
+      }
+      return assigned;
+    }
+
     private TaskInfo assign(
         MutableStoreProvider storeProvider,
         Offer offer,
-        Set<String> requestedPorts,
         String taskId) {
 
       String host = offer.getHostname();
-      Set<Integer> selectedPorts = Resources.from(offer).getPorts(requestedPorts.size());
-      Preconditions.checkState(selectedPorts.size() == requestedPorts.size());
-
-      final Iterator<String> names = requestedPorts.iterator();
-      Map<String, Integer> portsByName = FluentIterable.from(selectedPorts)
-          .uniqueIndex(input -> names.next());
-
       IAssignedTask assigned = stateManager.assignTask(
           storeProvider,
           taskId,
           host,
           offer.getSlaveId(),
-          portsByName);
+          task -> mapAndAssignResources(offer, task));
       LOG.info(
           "Offer on slave {} (id {}) is being assigned task for {}.",
           host, offer.getSlaveId().getValue(), taskId);
@@ -158,7 +161,6 @@ public interface TaskAssigner {
           TaskInfo taskInfo = assign(
               storeProvider,
               offer.getOffer(),
-              resourceRequest.getRequestedPorts(),
               taskId);
 
           try {

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
index 7313279..98fe860 100644
--- a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
@@ -298,6 +298,17 @@ public class ConfigurationManagerTest {
     DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
   }
 
+  @Test
+  public void testTaskLinks() throws Exception {
+    TaskConfig builder = CONFIG_WITH_CONTAINER.newBuilder();
+    builder.addToResources(namedPort("health"));
+    builder.unsetTaskLinks();
+
+    ITaskConfig populated =
+        DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder));
+    assertEquals(ImmutableSet.of("health", "http"), populated.getTaskLinks().keySet());
+  }
+
   private void expectTaskDescriptionException(String message) {
     expectedException.expect(TaskDescriptionException.class);
     expectedException.expectMessage(message);

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 6370a12..94a885f 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts;
 import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
@@ -103,18 +104,18 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     ResourceSlot twoPorts = Resources.from(
         Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 81))).slot();
 
-    ITaskConfig noPortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
-        .newBuilder()
-        .setRequestedPorts(ImmutableSet.of()));
-    ITaskConfig onePortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
-        .newBuilder()
-        .setRequestedPorts(ImmutableSet.of("one")));
-    ITaskConfig twoPortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
-        .newBuilder()
-        .setRequestedPorts(ImmutableSet.of("one", "two")));
-    ITaskConfig threePortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK)
-        .newBuilder()
-        .setRequestedPorts(ImmutableSet.of("one", "two", "three")));
+    ITaskConfig noPortTask = resetPorts(
+        makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK),
+        ImmutableSet.of());
+    ITaskConfig onePortTask = resetPorts(
+        makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK),
+        ImmutableSet.of("one"));
+    ITaskConfig twoPortTask = resetPorts(
+        makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK),
+        ImmutableSet.of("one", "two"));
+    ITaskConfig threePortTask = resetPorts(
+        makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK),
+        ImmutableSet.of("one", "two", "three"));
 
     Set<Veto> none = ImmutableSet.of();
     IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index bf18d5d..ad397c6 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -66,6 +66,7 @@ import static org.apache.aurora.scheduler.mesos.TaskExecutors.SOME_OVERHEAD_EXEC
 import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_CONFIG;
 import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR;
 import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosRangeResource;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -187,8 +188,9 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
   @Test
   public void testCreateFromPortsUnset() {
     AssignedTask builder = TASK.newBuilder();
-    builder.getTask().unsetRequestedPorts();
     builder.unsetAssignedPorts();
+    builder.setTask(
+        resetPorts(ITaskConfig.build(builder.getTask()), ImmutableSet.of()).newBuilder());
     IAssignedTask assignedTask = IAssignedTask.build(builder);
     expect(tierManager.getTier(assignedTask.getTask())).andReturn(DEV_TIER);
     taskFactory = new MesosTaskFactoryImpl(config, tierManager, SERVER_INFO);
@@ -353,8 +355,9 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
   public void testPopulateDiscoveryInfoNoPort() {
     config = new ExecutorSettings(THERMOS_CONFIG, true);
     AssignedTask builder = TASK.newBuilder();
-    builder.getTask().unsetRequestedPorts();
     builder.unsetAssignedPorts();
+    builder.setTask(
+        resetPorts(ITaskConfig.build(builder.getTask()), ImmutableSet.of()).newBuilder());
     IAssignedTask assignedTask = IAssignedTask.build(builder);
     expect(tierManager.getTier(assignedTask.getTask())).andReturn(DEV_TIER);
     taskFactory = new MesosTaskFactoryImpl(config, tierManager, SERVER_INFO);

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java b/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
index d5f2172..36c5c11 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java
@@ -29,6 +29,9 @@ import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Resource;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
 import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
@@ -51,10 +54,10 @@ public class AcceptedOfferTest {
 
   @Test
   public void testReservedPredicates() {
-    Protos.Resource withRole = makeScalar(CPUS.getMesosName(), TEST_ROLE, false, 1.0);
+    Protos.Resource withRole = mesosScalar(CPUS, TEST_ROLE, false, 1.0);
     assertTrue(AcceptedOffer.RESERVED.apply(withRole));
     assertFalse(AcceptedOffer.NOT_RESERVED.apply(withRole));
-    Protos.Resource absentRole = makeScalar(CPUS.getMesosName(), ABSENT_ROLE, false, 1.0);
+    Protos.Resource absentRole = mesosScalar(CPUS, ABSENT_ROLE, false, 1.0);
     assertFalse(AcceptedOffer.RESERVED.apply(absentRole));
     assertTrue(AcceptedOffer.NOT_RESERVED.apply(absentRole));
   }
@@ -62,7 +65,7 @@ public class AcceptedOfferTest {
   @Test
   public void testAllocateEmpty() {
     AcceptedOffer acceptedOffer = AcceptedOffer.create(
-        fakeOffer(Collections.emptyList()),
+        offer(),
         ResourceSlot.NONE,
         ResourceSlot.NONE,
         ImmutableSet.of(),
@@ -73,12 +76,10 @@ public class AcceptedOfferTest {
 
   @Test
   public void testAllocateRange() {
-    List<Resource> resources = ImmutableList.<Resource>builder()
-        .add(makePortResource(Optional.absent(), 80, 81, 90, 91, 92, 93))
-        .add(makePortResource(TEST_ROLE, 100, 101))
-        .build();
     AcceptedOffer acceptedOffer = AcceptedOffer.create(
-        fakeOffer(resources),
+        offer(
+            mesosRange(PORTS, Optional.absent(), 80, 81, 90, 91, 92, 93),
+            mesosRange(PORTS, TEST_ROLE, 100, 101)),
         ResourceSlot.NONE,
         ResourceSlot.NONE,
         ImmutableSet.of(80, 90, 100),
@@ -87,8 +88,8 @@ public class AcceptedOfferTest {
     List<Resource> expected = ImmutableList.<Resource>builder()
         // Because we prefer reserved resources and handle them before non-reserved resources,
         // result should have ports for the reserved resources first.
-        .add(makePortResource(TEST_ROLE, 100))
-        .add(makePortResource(Optional.absent(), 80, 90))
+        .add(mesosRange(PORTS, TEST_ROLE, 100))
+        .add(mesosRange(PORTS, Optional.absent(), 80, 90))
         .build();
     assertEquals(expected, acceptedOffer.getTaskResources());
     assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources());
@@ -96,11 +97,10 @@ public class AcceptedOfferTest {
 
   @Test(expected = Resources.InsufficientResourcesException.class)
   public void testAllocateRangeInsufficent() {
-    List<Resource> resources = ImmutableList.of(
-        makePortResource(ABSENT_ROLE, 80),
-        makePortResource(ABSENT_ROLE, 100, 101));
     AcceptedOffer.create(
-        fakeOffer(resources),
+        offer(
+            mesosRange(PORTS, ABSENT_ROLE, 80),
+            mesosRange(PORTS, ABSENT_ROLE, 100, 101)),
         ResourceSlot.NONE,
         ResourceSlot.NONE,
         ImmutableSet.of(80, 90, 100),
@@ -116,52 +116,43 @@ public class AcceptedOfferTest {
   }
 
   private void runAllocateSingleRole(Optional<String> role, boolean cpuRevocable) {
-    List<Resource> resources = ImmutableList.<Resource>builder()
-        .add(makeScalar(
-            CPUS.getMesosName(), role, cpuRevocable, TOTAL_SLOT.getNumCpus()))
-        .add(makeScalar(
-            RAM_MB.getMesosName(), role, false, TOTAL_SLOT.getRam().as(Data.MB)))
-        .add(makeScalar(
-            DISK_MB.getMesosName(), role, false, TOTAL_SLOT.getDisk().as(Data.MB)))
-        .add(makePortResource(role, TASK_PORTS))
-        .build();
-    Protos.Offer offer = fakeOffer(resources);
+    Protos.Offer offer = offer(
+        mesosScalar(CPUS, role, cpuRevocable, TOTAL_SLOT.getNumCpus()),
+        mesosScalar(RAM_MB, role, false, TOTAL_SLOT.getRam().as(Data.MB)),
+        mesosScalar(DISK_MB, role, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+        mesosRange(PORTS, role, TASK_PORTS));
 
     AcceptedOffer offerAllocation = AcceptedOffer.create(
         offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, cpuRevocable));
 
     List<Resource> taskList = ImmutableList.<Resource>builder()
-        .add(makeScalar(CPUS.getMesosName(), role, cpuRevocable, TASK_SLOT.getNumCpus()))
-        .add(makeScalar(RAM_MB.getMesosName(), role, false, TASK_SLOT.getRam().as(Data.MB)))
-        .add(makeScalar(
-            DISK_MB.getMesosName(), role, false, TASK_SLOT.getDisk().as(Data.MB)))
-        .add(makePortResource(role, TASK_PORTS))
+        .add(mesosScalar(CPUS, role, cpuRevocable, TASK_SLOT.getNumCpus()))
+        .add(mesosScalar(RAM_MB, role, false, TASK_SLOT.getRam().as(Data.MB)))
+        .add(mesosScalar(
+            DISK_MB, role, false, TASK_SLOT.getDisk().as(Data.MB)))
+        .add(mesosRange(PORTS, role, TASK_PORTS))
         .build();
     assertEquals(taskList, offerAllocation.getTaskResources());
 
     List<Resource> executorList = ImmutableList.<Resource>builder()
-        .add(makeScalar(
-            CPUS.getMesosName(), role, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
-        .add(makeScalar(
-            RAM_MB.getMesosName(), role, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
-        .add(makeScalar(
-            DISK_MB.getMesosName(), role, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+        .add(mesosScalar(
+            CPUS, role, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+        .add(mesosScalar(
+            RAM_MB, role, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
+        .add(mesosScalar(
+            DISK_MB, role, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
         .build();
     assertEquals(executorList, offerAllocation.getExecutorResources());
   }
 
   @Test(expected = Resources.InsufficientResourcesException.class)
   public void testAllocateSingleRoleInsufficient() {
-    List<Resource> resources = ImmutableList.<Resource>builder()
+    Protos.Offer offer = offer(
         // EXECUTOR_SLOT's CPU is not included here.
-        .add(makeScalar(CPUS.getMesosName(), TEST_ROLE, false, TASK_SLOT.getNumCpus()))
-        .add(makeScalar(
-            RAM_MB.getMesosName(), TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)))
-        .add(makeScalar(
-            DISK_MB.getMesosName(), TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)))
-        .add(makePortResource(TEST_ROLE, TASK_PORTS))
-        .build();
-    Protos.Offer offer = fakeOffer(resources);
+        mesosScalar(CPUS, TEST_ROLE, false, TASK_SLOT.getNumCpus()),
+        mesosScalar(RAM_MB, TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
+        mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+        mesosRange(PORTS, TEST_ROLE, TASK_PORTS));
 
     AcceptedOffer.create(
         offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, false));
@@ -174,136 +165,64 @@ public class AcceptedOfferTest {
   }
 
   private void runMultipleRoles(boolean cpuRevocable) {
-    List<Resource> resources = ImmutableList.<Resource>builder()
+    Protos.Offer offer = offer(
         // Make cpus come from two roles.
-        .add(makeScalar(
-            CPUS.getMesosName(),
-            TEST_ROLE,
-            cpuRevocable,
-            EXECUTOR_SLOT.getNumCpus()))
-        .add(makeScalar(
-            CPUS.getMesosName(),
-            ABSENT_ROLE,
-            cpuRevocable,
-            TASK_SLOT.getNumCpus()))
+        mesosScalar(CPUS, TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()),
+        mesosScalar(CPUS, ABSENT_ROLE, cpuRevocable, TASK_SLOT.getNumCpus()),
         // Make ram come from default role
-        .add(makeScalar(
-            RAM_MB.getMesosName(),
-            ABSENT_ROLE,
-            false,
-            TOTAL_SLOT.getRam().as(Data.MB)))
+        mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
         // Make disk come from non-default role.
-        .add(makeScalar(
-            DISK_MB.getMesosName(),
-            TEST_ROLE,
-            false,
-            TOTAL_SLOT.getDisk().as(Data.MB)))
-        .add(makePortResource(TEST_ROLE, 80))
-        .add(makePortResource(ABSENT_ROLE, 90))
-        .build();
-
-    Protos.Offer offer = fakeOffer(resources);
+        mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+        mesosRange(PORTS, TEST_ROLE, 80),
+        mesosRange(PORTS, ABSENT_ROLE, 90));
 
     AcceptedOffer offerAllocation = AcceptedOffer.create(
         offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, cpuRevocable));
 
     List<Resource> taskList = ImmutableList.<Resource>builder()
         // We intentionally sliced the offer resource to not align with TASK_SLOT's num cpus.
-        .add(makeScalar(
-            CPUS.getMesosName(), TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
-        .add(makeScalar(
-            CPUS.getMesosName(),
+        .add(mesosScalar(
+            CPUS, TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+        .add(mesosScalar(
+            CPUS,
             ABSENT_ROLE,
             cpuRevocable,
             TASK_SLOT.subtract(EXECUTOR_SLOT).getNumCpus()))
-        .add(makeScalar(
-            RAM_MB.getMesosName(), ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB)))
-        .add(makeScalar(
-            DISK_MB.getMesosName(), TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB)))
-        .add(makePortResource(TEST_ROLE, 80))
-        .add(makePortResource(ABSENT_ROLE, 90))
+        .add(mesosScalar(
+            RAM_MB, ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB)))
+        .add(mesosScalar(
+            DISK_MB, TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB)))
+        .add(mesosRange(PORTS, TEST_ROLE, 80))
+        .add(mesosRange(PORTS, ABSENT_ROLE, 90))
         .build();
     assertEquals(taskList, offerAllocation.getTaskResources());
 
     List<Resource> executorList = ImmutableList.<Resource>builder()
-        .add(makeScalar(
-            CPUS.getMesosName(), ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
-        .add(makeScalar(
-            RAM_MB.getMesosName(), ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
-        .add(makeScalar(
-            DISK_MB.getMesosName(), TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+        .add(mesosScalar(
+            CPUS, ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+        .add(mesosScalar(
+            RAM_MB, ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
+        .add(mesosScalar(
+            DISK_MB, TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
         .build();
     assertEquals(executorList, offerAllocation.getExecutorResources());
   }
 
   @Test(expected = Resources.InsufficientResourcesException.class)
   public void testMultipleRolesInsufficient() {
-    // Similar to testMultipleRoles, but make some of cpus as revocable
-    List<Resource> resources = ImmutableList.<Resource>builder()
+    Protos.Offer offer = offer(
+        // Similar to testMultipleRoles, but make some of cpus as revocable
         // Make cpus come from two roles.
-        .add(makeScalar(
-            CPUS.getMesosName(),
-            TEST_ROLE,
-            true,
-            EXECUTOR_SLOT.getNumCpus()))
-        .add(makeScalar(
-            CPUS.getMesosName(),
-            ABSENT_ROLE,
-            false,
-            TASK_SLOT.getNumCpus()))
+        mesosScalar(CPUS, TEST_ROLE, true, EXECUTOR_SLOT.getNumCpus()),
+        mesosScalar(CPUS, ABSENT_ROLE, false, TASK_SLOT.getNumCpus()),
         // Make ram come from default role
-        .add(makeScalar(
-            RAM_MB.getMesosName(),
-            ABSENT_ROLE,
-            false,
-            TOTAL_SLOT.getRam().as(Data.MB)))
+        mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)),
         // Make disk come from non-default role.
-        .add(makeScalar(
-            DISK_MB.getMesosName(),
-            TEST_ROLE,
-            false,
-            TOTAL_SLOT.getDisk().as(Data.MB)))
-        .add(makePortResource(TEST_ROLE, 80))
-        .add(makePortResource(ABSENT_ROLE, 90))
-        .build();
-    Protos.Offer offer = fakeOffer(resources);
+        mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)),
+        mesosRange(PORTS, TEST_ROLE, 80),
+        mesosRange(PORTS, ABSENT_ROLE, 90));
     // We don't have enough resource to satisfy a non-revocable request.
     AcceptedOffer.create(
         offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, false));
   }
-
-  private static Resource makePortResource(Optional<String> role, Integer... values) {
-    Resource.Builder prototype = Resource.newBuilder()
-        .setType(Protos.Value.Type.RANGES)
-        .setName(PORTS.getMesosName());
-    if (role.isPresent()) {
-      prototype.setRole(role.get());
-    }
-    return AcceptedOffer.makeMesosRangeResource(prototype.build(), ImmutableSet.copyOf(values));
-  }
-
-  private static Resource makeScalar(
-      String name, Optional<String> role, boolean revocable, double value) {
-    Resource.Builder resource = Resource.newBuilder()
-        .setName(name)
-        .setType(Protos.Value.Type.SCALAR)
-        .setScalar(Protos.Value.Scalar.newBuilder().setValue(value));
-    if (role.isPresent()) {
-      resource.setRole(role.get());
-    }
-    if (revocable) {
-      resource.setRevocable(Resource.RevocableInfo.getDefaultInstance());
-    }
-    return resource.build();
-  }
-
-  private static Protos.Offer fakeOffer(List<Resource> resources) {
-    return Protos.Offer.newBuilder()
-        .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
-        .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
-        .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
-        .setHostname("hostname")
-        .addAllResources(resources)
-        .build();
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java b/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java
new file mode 100644
index 0000000..c94f7a9
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.resources.ResourceMapper.PORT_MAPPER;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.junit.Assert.assertEquals;
+
+public class PortMapperTest {
+  @Test
+  public void testAssignNoPorts() {
+    ScheduledTask builder = makeTask("id", JOB).newBuilder();
+    builder.getAssignedTask().getTask().unsetResources();
+    builder.getAssignedTask().unsetAssignedPorts();
+    IScheduledTask task = IScheduledTask.build(builder);
+
+    assertEquals(task, PORT_MAPPER.mapAndAssign(offer(), task));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testPortRangeScarcity() {
+    PORT_MAPPER.mapAndAssign(offer(), makeTask("id", JOB));
+  }
+
+  @Test
+  public void testPortRangeAbundance() {
+    Protos.Offer offer = offer(mesosRange(PORTS, 1, 2, 3, 4, 5));
+    assertEquals(
+        1,
+        PORT_MAPPER.mapAndAssign(offer, makeTask("id", JOB))
+            .getAssignedTask().getAssignedPorts().size());
+  }
+
+  @Test
+  public void testPortRangeExact() {
+    Protos.Offer offer = offer(mesosRange(PORTS, 1));
+    assertEquals(
+        1,
+        PORT_MAPPER.mapAndAssign(offer, makeTask("id", JOB))
+            .getAssignedTask().getAssignedPorts().size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
new file mode 100644
index 0000000..b6810b1
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.resources;
+
+import java.util.EnumSet;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Value.Scalar;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.Resource.namedPort;
+import static org.apache.aurora.gen.Resource.numCpus;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static org.apache.mesos.Protos.Value.Type.SCALAR;
+import static org.junit.Assert.assertEquals;
+
+public class ResourceManagerTest {
+  @Test
+  public void testGetOfferResources() {
+    Protos.Resource resource1 = Protos.Resource.newBuilder()
+        .setType(SCALAR)
+        .setName(CPUS.getMesosName())
+        .setScalar(Scalar.newBuilder().setValue(2.0).build())
+        .build();
+
+    Protos.Resource resource2 = Protos.Resource.newBuilder()
+        .setType(SCALAR)
+        .setName(RAM_MB.getMesosName())
+        .setScalar(Scalar.newBuilder().setValue(64).build())
+        .build();
+
+    Offer offer = Offer.newBuilder()
+        .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
+        .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
+        .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
+        .setHostname("hostname")
+        .addAllResources(ImmutableSet.of(resource1, resource2)).build();
+
+    assertEquals(
+        resource1,
+        Iterables.getOnlyElement(ResourceManager.getOfferResources(offer, CPUS)));
+    assertEquals(
+        resource2,
+        Iterables.getOnlyElement(ResourceManager.getOfferResources(offer, RAM_MB)));
+  }
+
+  @Test
+  public void testGetTaskResources() {
+    assertEquals(
+        IResource.build(numCpus(1.0)),
+        Iterables.getOnlyElement(ResourceManager.getTaskResources(makeTask("id", JOB), CPUS)));
+    assertEquals(
+        IResource.build(namedPort("http")),
+        Iterables.getOnlyElement(ResourceManager.getTaskResources(makeTask("id", JOB), PORTS)));
+  }
+
+  @Test
+  public void testGetTaskResourceTypes() {
+    ScheduledTask builder = makeTask("id", JOB).newBuilder();
+    builder.getAssignedTask().getTask().addToResources(namedPort("health"));
+
+    assertEquals(
+        EnumSet.allOf(ResourceType.class),
+        ResourceManager.getTaskResourceTypes(IScheduledTask.build(builder)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
index 04a8238..1583cef 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java
@@ -13,14 +13,26 @@
  */
 package org.apache.aurora.scheduler.resources;
 
+import java.util.Set;
+
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 
+import org.apache.aurora.gen.Resource;
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.Numbers;
+import org.apache.aurora.scheduler.storage.entities.IResource;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos;
 
 import static org.apache.aurora.gen.Resource.diskMb;
 import static org.apache.aurora.gen.Resource.numCpus;
 import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.fromResource;
 
 /**
  * Convenience methods for working with resources.
@@ -45,4 +57,77 @@ public final class ResourceTestUtil {
         .setRamMb(ramMb)
         .setDiskMb(diskMb));
   }
+
+  public static ITaskConfig resetPorts(ITaskConfig config, Set<String> portNames) {
+    TaskConfig builder = config.newBuilder()
+        .setRequestedPorts(portNames);
+    builder.getResources().removeIf(e -> fromResource(IResource.build(e)).equals(PORTS));
+    portNames.forEach(e -> builder.addToResources(Resource.namedPort(e)));
+    return ITaskConfig.build(builder);
+  }
+
+  public static Protos.Resource mesosScalar(ResourceType type, double value) {
+    return mesosScalar(type, Optional.absent(), false, value);
+  }
+
+  public static Protos.Resource mesosScalar(ResourceType type, double value, boolean revocable) {
+    return mesosScalar(type, Optional.absent(), revocable, value);
+  }
+
+  public static Protos.Resource mesosScalar(
+      ResourceType type,
+      Optional<String> role,
+      boolean revocable,
+      double value) {
+
+    return resourceBuilder(type, role, revocable)
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(value).build())
+        .build();
+  }
+
+  public static Protos.Resource mesosRange(ResourceType type, Integer... values) {
+    return mesosRange(type, Optional.absent(), values);
+  }
+
+  public static Protos.Resource mesosRange(
+      ResourceType type,
+      Optional<String> role,
+      Integer... values) {
+
+    return resourceBuilder(type, role, false)
+        .setRanges(Protos.Value.Ranges.newBuilder().addAllRange(
+            Iterables.transform(
+                Numbers.toRanges(ImmutableSet.copyOf(values)),
+                ResourceSlot.RANGE_TRANSFORM)))
+        .build();
+  }
+
+  public static Protos.Offer offer(Protos.Resource... resources) {
+    return Protos.Offer.newBuilder()
+        .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
+        .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
+        .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
+        .setHostname("hostname")
+        .addAllResources(ImmutableSet.copyOf(resources)).build();
+  }
+
+  private static Protos.Resource.Builder resourceBuilder(
+      ResourceType type,
+      Optional<String> role,
+      boolean revocable) {
+
+    Protos.Resource.Builder builder = Protos.Resource.newBuilder()
+        .setType(type.getMesosType())
+        .setName(type.getMesosName());
+
+    if (revocable) {
+      builder.setRevocable(Protos.Resource.RevocableInfo.getDefaultInstance());
+    }
+
+    if (role.isPresent()) {
+      builder.setRole(role.get());
+    }
+
+    return builder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
index ceb81e4..185338e 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java
@@ -13,164 +13,64 @@
  */
 package org.apache.aurora.scheduler.resources;
 
-import java.util.Set;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.collections.Pair;
 import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.scheduler.resources.Resources.InsufficientResourcesException;
 import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.Value.Range;
-import org.apache.mesos.Protos.Value.Ranges;
 import org.junit.Test;
 
 import static org.apache.aurora.common.quantity.Data.MB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
-import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosResource;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
 import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
-import static org.apache.mesos.Protos.Value.Type.RANGES;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 public class ResourcesTest {
   @Test
-  public void testPortRangeExact() {
-    Resource portsResource = createPortRange(Pair.of(1, 5));
-    Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5);
-    assertEquals(5, ports.size());
-  }
-
-  @Test
-  public void testOnePortAvailable() {
-    Resource portsResource = createPortRange(Pair.of(3, 3));
-    Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(1);
-    assertEquals(1, ports.size());
-  }
-
-  @Test
-  public void testPortRangeAbundance() {
-    Resource portsResource = createPortRange(Pair.of(1, 10));
-    Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5);
-    assertEquals(5, ports.size());
-  }
-
-  @Test
-  public void testPortRangeExhaust() {
-    Resource portsResource = createPortRanges(Pair.of(1, 2), Pair.of(10, 15));
-
-    Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(7);
-    assertEquals(7, ports.size());
-
-    ports = Resources.from(createOffer(portsResource)).getPorts(8);
-    assertEquals(8, ports.size());
-
-    try {
-      Resources.from(createOffer(portsResource)).getPorts(9);
-      fail("Ports should not have been sufficient");
-    } catch (InsufficientResourcesException e) {
-      // Expected.
-    }
-  }
-
-  @Test
-  public void testGetNoPorts() {
-    Resource portsResource = createPortRange(Pair.of(1, 5));
-    assertEquals(ImmutableSet.of(), Resources.from(createOffer(portsResource)).getPorts(0));
-  }
-
-  @Test(expected = Resources.InsufficientResourcesException.class)
-  public void testPortRangeScarcity() {
-    Resource portsResource = createPortRange(Pair.of(1, 2));
-    Resources.from(createOffer(portsResource)).getPorts(5);
-  }
-
-  @Test
   public void testGetSlot() {
-    ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
-        .add(makeMesosResource(CPUS, 8.0, false))
-        .add(makeMesosResource(RAM_MB, 1024, false))
-        .add(makeMesosResource(DISK_MB, 2048, false))
-        .add(createPortRange(Pair.of(1, 10)))
-        .build();
-
-    ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 10);
-    assertEquals(expected, Resources.from(createOffer(resources)).slot());
+    Protos.Offer offer = offer(
+        mesosScalar(CPUS, 8.0, false),
+        mesosScalar(RAM_MB, 1024, false),
+        mesosScalar(DISK_MB, 2048, false),
+        mesosRange(PORTS, 1, 2, 3));
+
+    ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 3);
+    assertEquals(expected, Resources.from(offer).slot());
   }
 
   @Test
   public void testMissingResourcesHandledGracefully() {
-    ImmutableList<Resource> resources = ImmutableList.<Resource>builder().build();
-    assertEquals(ResourceSlot.NONE, Resources.from(createOffer(resources)).slot());
+    assertEquals(ResourceSlot.NONE, Resources.from(offer()).slot());
   }
 
   @Test
   public void testFilter() {
-    ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
-        .add(makeMesosResource(CPUS, 8.0, true))
-        .add(makeMesosResource(RAM_MB, 1024, false))
-        .build();
+    Protos.Offer offer = offer(
+        mesosScalar(CPUS, 8.0, true),
+        mesosScalar(RAM_MB, 1024, false));
 
     assertEquals(
         new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
-        Resources.from(createOffer(resources)).filter(Resources.REVOCABLE).slot());
+        Resources.from(offer).filter(Resources.REVOCABLE).slot());
   }
 
   @Test
   public void testFilterByTier() {
-    ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
-        .add(makeMesosResource(CPUS, 8.0, true))
-        .add(makeMesosResource(CPUS, 8.0, false))
-        .add(makeMesosResource(RAM_MB, 1024, false))
-        .build();
+    Protos.Offer offer = offer(
+        mesosScalar(CPUS, 8.0, true),
+        mesosScalar(CPUS, 8.0, false),
+        mesosScalar(RAM_MB, 1024, false));
 
     assertEquals(
         new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
-        Resources.from(createOffer(resources)).filter(REVOCABLE_TIER).slot());
+        Resources.from(offer).filter(REVOCABLE_TIER).slot());
 
     assertEquals(
         new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0),
-        Resources.from(createOffer(resources)).filter(DEV_TIER).slot());
-  }
-
-  private Resource createPortRange(Pair<Integer, Integer> range) {
-    return createPortRanges(ImmutableSet.of(range));
-  }
-
-  private Resource createPortRanges(Pair<Integer, Integer> rangeA, Pair<Integer, Integer> rangeB) {
-    return createPortRanges(
-        ImmutableSet.<Pair<Integer, Integer>>builder().add(rangeA).add(rangeB).build());
-  }
-
-  private Resource createPortRanges(Set<Pair<Integer, Integer>> ports) {
-    Ranges.Builder ranges = Ranges.newBuilder();
-    for (Pair<Integer, Integer> range : ports) {
-      ranges.addRange(Range.newBuilder().setBegin(range.getFirst()).setEnd(range.getSecond()));
-    }
-
-    return Resource.newBuilder()
-        .setName(PORTS.getMesosName())
-        .setType(RANGES)
-        .setRanges(ranges)
-        .build();
-  }
-
-  private static Protos.Offer createOffer(Resource resource) {
-    return createOffer(ImmutableList.of(resource));
-  }
-
-  private static Protos.Offer createOffer(Iterable<Resource> resources) {
-    return Protos.Offer.newBuilder()
-        .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
-        .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
-        .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
-        .setHostname("hostname")
-        .addAllResources(resources).build();
+        Resources.from(offer).filter(DEV_TIER).slot());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 498da78..2370178 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -17,6 +17,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -44,6 +46,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -69,6 +72,8 @@ import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.state.StateChangeResult.ILLEGAL;
 import static org.apache.aurora.scheduler.state.StateChangeResult.INVALID_CAS_STATE;
 import static org.apache.aurora.scheduler.state.StateChangeResult.NOOP;
@@ -403,14 +408,10 @@ public class StateManagerImplTest extends EasyMockTest {
         storeProvider -> stateManager.deleteTasks(storeProvider, ImmutableSet.of(taskId)));
   }
 
-  private static ITaskConfig setRequestedPorts(ITaskConfig config, Set<String> portNames) {
-    return ITaskConfig.build(config.newBuilder().setRequestedPorts(portNames));
-  }
-
   @Test
   public void testPortResource() throws Exception {
     Set<String> requestedPorts = ImmutableSet.of("one", "two", "three");
-    ITaskConfig task = setRequestedPorts(NON_SERVICE_CONFIG, requestedPorts);
+    ITaskConfig task = resetPorts(NON_SERVICE_CONFIG, requestedPorts);
 
     String taskId = "a";
     expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
@@ -425,13 +426,15 @@ public class StateManagerImplTest extends EasyMockTest {
 
     assertEquals(
         requestedPorts,
-        actual.getAssignedTask().getTask().getRequestedPorts());
+        StreamSupport.stream(ResourceManager.getTaskResources(actual, PORTS).spliterator(), false)
+          .map(e -> e.getNamedPort())
+          .collect(Collectors.toSet()));
   }
 
   @Test
   public void testPortResourceResetAfterReschedule() throws Exception {
     Set<String> requestedPorts = ImmutableSet.of("one");
-    ITaskConfig task = setRequestedPorts(NON_SERVICE_CONFIG, requestedPorts);
+    ITaskConfig task = resetPorts(NON_SERVICE_CONFIG, requestedPorts);
 
     String taskId = "a";
     expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
@@ -561,6 +564,10 @@ public class StateManagerImplTest extends EasyMockTest {
         taskId,
         host.getHost(),
         SlaveID.newBuilder().setValue(host.getSlaveId()).build(),
-        ports));
+        e -> {
+          ScheduledTask builder = e.newBuilder();
+          builder.getAssignedTask().setAssignedPorts(ports);
+          return IScheduledTask.build(builder);
+        }));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index a2df311..ca10323 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -20,8 +20,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduledTask;
@@ -56,41 +54,31 @@ import org.junit.Test;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LAUNCH_FAILED_MSG;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.mesos.Protos.Offer;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class TaskAssignerImplTest extends EasyMockTest {
 
-  private static final int PORT = 5000;
-  private static final String SLAVE_ID = "slaveId";
-  private static final Offer MESOS_OFFER = Offer.newBuilder()
-      .setId(OfferID.newBuilder().setValue("offerId"))
-      .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-      .setSlaveId(SlaveID.newBuilder().setValue(SLAVE_ID))
-      .setHostname("hostName")
-      .addResources(Resource.newBuilder()
-          .setName("ports")
-          .setType(Type.RANGES)
-          .setRanges(
-              Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
-      .build();
+  private static final int PORT = 1000;
+  private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
+  private static final String SLAVE_ID = MESOS_OFFER.getSlaveId().getValue();
   private static final HostOffer OFFER =
       new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
-  private static final String PORT_NAME = "http";
-  private static final IScheduledTask TASK = IScheduledTask.build(
-      new ScheduledTask()
-          .setAssignedTask(new AssignedTask()
-              .setTaskId("taskId")
-              .setTask(new TaskConfig()
-                  .setJob(new JobKey("r", "e", "n"))
-                  .setExecutorConfig(new ExecutorConfig().setData("opaque data"))
-                  .setRequestedPorts(ImmutableSet.of(PORT_NAME)))));
+  private static final IScheduledTask TASK = makeTask("id", JOB);
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
@@ -108,7 +96,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private SchedulingFilter filter;
   private MesosTaskFactory taskFactory;
   private OfferManager offerManager;
-  private TaskAssigner assigner;
+  private TaskAssignerImpl assigner;
   private TierManager tierManager;
 
   @Before
@@ -128,13 +116,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
-    expect(stateManager.assignTask(
-        storeProvider,
-        Tasks.id(TASK),
-        MESOS_OFFER.getHostname(),
-        MESOS_OFFER.getSlaveId(),
-        ImmutableMap.of(PORT_NAME, PORT)))
-        .andReturn(TASK.getAssignedTask());
+    expectAssignTask(MESOS_OFFER);
     expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
         .andReturn(TASK_INFO);
 
@@ -190,13 +172,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
-    expect(stateManager.assignTask(
-        storeProvider,
-        Tasks.id(TASK),
-        MESOS_OFFER.getHostname(),
-        MESOS_OFFER.getSlaveId(),
-        ImmutableMap.of(PORT_NAME, PORT)))
-        .andReturn(TASK.getAssignedTask());
+    expectAssignTask(MESOS_OFFER);
     expect(stateManager.changeState(
         storeProvider,
         Tasks.id(TASK),
@@ -254,13 +230,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER));
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of());
-    expect(stateManager.assignTask(
-        storeProvider,
-        Tasks.id(TASK),
-        offer.getOffer().getHostname(),
-        offer.getOffer().getSlaveId(),
-        ImmutableMap.of(PORT_NAME, PORT)))
-        .andReturn(TASK.getAssignedTask());
+    expectAssignTask(offer.getOffer());
     expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer()))
         .andReturn(TASK_INFO);
     offerManager.launchTask(offer.getOffer().getId(), TASK_INFO);
@@ -306,13 +276,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
         .andReturn(ImmutableSet.of());
 
-    expect(stateManager.assignTask(
-        storeProvider,
-        Tasks.id(TASK),
-        OFFER.getOffer().getHostname(),
-        OFFER.getOffer().getSlaveId(),
-        ImmutableMap.of(PORT_NAME, PORT)))
-        .andReturn(TASK.getAssignedTask());
+    expectAssignTask(MESOS_OFFER);
     expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer()))
         .andReturn(TASK_INFO);
     offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);
@@ -326,4 +290,23 @@ public class TaskAssignerImplTest extends EasyMockTest {
         Tasks.id(TASK),
         ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
   }
+
+  @Test
+  public void testResourceMapperCallback() {
+    ScheduledTask builder = TASK.newBuilder();
+    builder.getAssignedTask().unsetAssignedPorts();
+
+    control.replay();
+
+    assertEquals(TASK, assigner.mapAndAssignResources(MESOS_OFFER, IScheduledTask.build(builder)));
+  }
+
+  private void expectAssignTask(Offer offer) {
+    expect(stateManager.assignTask(
+        eq(storeProvider),
+        eq(Tasks.id(TASK)),
+        eq(offer.getHostname()),
+        eq(offer.getSlaveId()),
+        anyObject())).andReturn(TASK.getAssignedTask());
+  }
 }