You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/01/14 19:48:03 UTC

aurora git commit: Accept resource offers from multiple framework roles.

Repository: aurora
Updated Branches:
  refs/heads/master 952ef6db3 -> a80260eaf


Accept resource offers from multiple framework roles.

Bugs closed: AURORA-1109

Reviewed at https://reviews.apache.org/r/42126/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a80260ea
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a80260ea
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a80260ea

Branch: refs/heads/master
Commit: a80260eafcd652e75706f9ad5a5a1886c7b051ef
Parents: 952ef6d
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu Jan 14 10:43:32 2016 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Jan 14 10:43:32 2016 -0800

----------------------------------------------------------------------
 NEWS                                            |   2 +
 .../apache/aurora/scheduler/AcceptedOffer.java  | 234 ++++++++++++++
 .../apache/aurora/scheduler/ResourceSlot.java   |  50 ++-
 .../org/apache/aurora/scheduler/Resources.java  |  37 +--
 .../mesos/CommandLineDriverSettingsModule.java  |  19 +-
 .../scheduler/mesos/MesosTaskFactory.java       |  58 ++--
 .../aurora/scheduler/state/TaskAssigner.java    |   2 +-
 .../aurora/scheduler/AcceptedOfferTest.java     | 303 +++++++++++++++++++
 .../aurora/scheduler/ResourceSlotTest.java      |  14 +-
 .../CommandLineDriverSettingsModuleTest.java    |  23 +-
 .../mesos/MesosTaskFactoryImplTest.java         |  61 +++-
 .../scheduler/state/TaskAssignerImplTest.java   |   8 +-
 12 files changed, 714 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/NEWS
----------------------------------------------------------------------
diff --git a/NEWS b/NEWS
index acaff9e..809077f 100644
--- a/NEWS
+++ b/NEWS
@@ -21,6 +21,8 @@
   at http://logback.qos.ch/manual/configuration.html
   With this change, we have removed the following scheduler command line arguments as they were
   made redundant: `logtostderr`, `alsologtostderr`, `vlog`, `vmodule`, and `use_glog_formatter`.
+- Added support for configuring Mesos role by passing `-mesos_role` to Aurora scheduler at start time.
+  This enables resource reservation for Aurora when running in a shared Mesos cluster.
 
 0.11.0
 ------

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java b/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java
new file mode 100644
index 0000000..9c2dc0b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.scheduler.base.Numbers;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Resource;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Allocate resources from an accepted Mesos Offer to TaskInfo and ExecutorInfo.
+ */
+public final class AcceptedOffer {
+
+  public static final String DEFAULT_ROLE_NAME = "*";
+
+  /**
+   * Reserved resource filter.
+   */
+  public static final Predicate<Resource> RESERVED =
+      e -> e.hasRole() && !e.getRole().equals(DEFAULT_ROLE_NAME);
+
+  /**
+   * Non reserved resource filter.
+   */
+  public static final Predicate<Resource> NOT_RESERVED = Predicates.not(RESERVED);
+
+  /**
+   * Helper function to check a resource value is small enough to be considered zero.
+   */
+  public static boolean nearZero(double value) {
+    return Math.abs(value) < EPSILON;
+  }
+
+  /**
+   * Get proper value for {@link org.apache.mesos.Protos.TaskInfo}'s resources.
+   * @return A list of Resource used for TaskInfo.
+   */
+  public List<Resource> getTaskResources() {
+    return taskResources;
+  }
+
+  /**
+   * Get proper value for {@link org.apache.mesos.Protos.ExecutorInfo}'s resources.
+   * @return A list of Resource used for ExecutorInfo.
+   */
+  public List<Resource> getExecutorResources() {
+    return executorResources;
+  }
+
+  /**
+   * Use this epsilon value to avoid comparison with zero.
+   */
+  private static final double EPSILON = 1e-6;
+
+  private final List<Resource> taskResources;
+  private final List<Resource> executorResources;
+
+  public static AcceptedOffer create(
+      Offer offer,
+      ResourceSlot taskSlot,
+      ResourceSlot executorSlot,
+      Set<Integer> selectedPorts,
+      TierInfo tierInfo) throws Resources.InsufficientResourcesException {
+
+    List<Resource> reservedFirst = ImmutableList.<Resource>builder()
+        .addAll(Iterables.filter(offer.getResourcesList(), RESERVED))
+        .addAll(Iterables.filter(offer.getResourcesList(), NOT_RESERVED))
+        .build();
+
+    boolean revocable = tierInfo.isRevocable();
+    List<Resource.Builder> cpuResources = filterToBuilders(
+        reservedFirst,
+        ResourceType.CPUS.getName(),
+        revocable ? Resources.REVOCABLE : Resources.NON_REVOCABLE);
+    List<Resource.Builder> memResources = filterToBuilderNonRevocable(
+        reservedFirst, ResourceType.RAM_MB.getName());
+    List<Resource.Builder> diskResources = filterToBuilderNonRevocable(
+        reservedFirst, ResourceType.DISK_MB.getName());
+    List<Resource.Builder> portsResources = filterToBuilderNonRevocable(
+        reservedFirst, ResourceType.PORTS.getName());
+
+    List<Resource> taskResources = ImmutableList.<Resource>builder()
+        .addAll(allocateScalarType(cpuResources, taskSlot.getNumCpus(), revocable))
+        .addAll(allocateScalarType(memResources, taskSlot.getRam().as(Data.MB), false))
+        .addAll(allocateScalarType(diskResources, taskSlot.getDisk().as(Data.MB), false))
+        .addAll(allocateRangeType(portsResources, selectedPorts))
+        .build();
+
+    List<Resource> executorResources = ImmutableList.<Resource>builder()
+        .addAll(allocateScalarType(cpuResources, executorSlot.getNumCpus(), revocable))
+        .addAll(allocateScalarType(memResources, executorSlot.getRam().as(Data.MB), false))
+        .addAll(allocateScalarType(diskResources, executorSlot.getDisk().as(Data.MB), false))
+        .build();
+
+    return new AcceptedOffer(taskResources, executorResources);
+  }
+
+  private AcceptedOffer(
+      List<Resource> taskResources,
+      List<Resource> executorResources) {
+
+    this.taskResources = requireNonNull(taskResources);
+    this.executorResources = requireNonNull(executorResources);
+  }
+
+  private static List<Resource> allocateRangeType(
+      List<Resource.Builder> from,
+      Set<Integer> valueSet) throws Resources.InsufficientResourcesException {
+
+    Set<Integer> leftOver = Sets.newHashSet(valueSet);
+    ImmutableList.Builder<Resource> result = ImmutableList.<Resource>builder();
+    for (Resource.Builder r : from) {
+      Set<Integer> fromResource = Sets.newHashSet(Iterables.concat(
+          Iterables.transform(r.getRanges().getRangeList(), Resources.RANGE_TO_MEMBERS)));
+      Set<Integer> available = Sets.newHashSet(Sets.intersection(leftOver, fromResource));
+      if (available.isEmpty()) {
+        continue;
+      }
+      Resource newResource = makeMesosRangeResource(r.build(), available);
+      result.add(newResource);
+      leftOver.removeAll(available);
+      if (leftOver.isEmpty()) {
+        break;
+      }
+    }
+    if (!leftOver.isEmpty()) {
+      // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
+      // consistent.
+      // Maybe we should consider implementing resource veto with this class to ensure that.
+      throw new Resources.InsufficientResourcesException(
+          "Insufficient resource for range type when allocating from offer");
+    }
+    return result.build();
+  }
+
+  /**
+   * Creates a mesos resource of integer ranges from given prototype.
+   *
+   * @param prototype Resource prototype.
+   * @param values    Values to translate into ranges.
+   * @return A new mesos ranges resource.
+   */
+  static Resource makeMesosRangeResource(
+      Resource prototype,
+      Set<Integer> values) {
+
+    return Protos.Resource.newBuilder(prototype)
+        .setRanges(Protos.Value.Ranges.newBuilder()
+            .addAllRange(
+                Iterables.transform(Numbers.toRanges(values), ResourceSlot.RANGE_TRANSFORM)))
+        .build();
+  }
+
+  private static List<Resource> allocateScalarType(
+      List<Resource.Builder> from,
+      double amount,
+      boolean revocable) throws Resources.InsufficientResourcesException {
+
+    double remaining = amount;
+    ImmutableList.Builder<Resource> result = ImmutableList.builder();
+    for (Resource.Builder r : from) {
+      if (nearZero(remaining)) {
+        break;
+      }
+      final double available = r.getScalar().getValue();
+      if (nearZero(available)) {
+        // Skip resource slot that is already used up.
+        continue;
+      }
+      final double used = Math.min(remaining, available);
+      remaining -= used;
+      Resource.Builder newResource =
+          Resource.newBuilder(r.build())
+              .setScalar(Protos.Value.Scalar.newBuilder().setValue(used).build());
+      if (revocable) {
+        newResource.setRevocable(Resource.RevocableInfo.newBuilder());
+      }
+      result.add(newResource.build());
+      r.getScalarBuilder().setValue(available - used);
+    }
+    if (!nearZero(remaining)) {
+      // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is
+      // consistent.
+      // Maybe we should consider implementing resource veto with this class to ensure that.
+      throw new Resources.InsufficientResourcesException(
+          "Insufficient resource when allocating from offer");
+    }
+    return result.build();
+  }
+
+  private static List<Resource.Builder> filterToBuilders(
+      List<Resource> resources,
+      String name,
+      Predicate<Resource> additionalFilter) {
+
+    return FluentIterable.from(resources)
+        .filter(e -> e.getName().equals(name))
+        .filter(additionalFilter)
+        .transform(Resource::toBuilder)
+        .toList();
+  }
+
+  private static List<Resource.Builder> filterToBuilderNonRevocable(
+      List<Resource> resources,
+      String name) {
+
+    return filterToBuilders(resources, name, Resources.NON_REVOCABLE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
index 7c3d681..86f2667 100644
--- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
@@ -23,7 +23,6 @@ import java.util.function.Predicate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Range;
@@ -43,7 +42,6 @@ import static java.util.Objects.requireNonNull;
 import static org.apache.aurora.common.quantity.Data.BYTES;
 import static org.apache.aurora.scheduler.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
-import static org.apache.aurora.scheduler.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
 
 /**
@@ -62,6 +60,15 @@ public final class ResourceSlot {
   public static final ResourceSlot NONE =
       new ResourceSlot(0, Amount.of(0L, Data.BITS), Amount.of(0L, Data.BITS), 0);
 
+  /**
+   * Convert {@link com.google.common.collect.Range} to {@link org.apache.mesos.Protos.Value.Range}.
+   */
+  public static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM =
+      input -> Protos.Value.Range.newBuilder()
+          .setBegin(input.lowerEndpoint())
+          .setEnd(input.upperEndpoint())
+          .build();
+
   public ResourceSlot(
       double numCpus,
       Amount<Long, Data> ram,
@@ -90,26 +97,6 @@ public final class ResourceSlot {
   }
 
   /**
-   * Adapts this slot object to a list of Mesos resources.
-   *
-   * @param selectedPorts The ports selected, to be applied as concrete task ranges.
-   * @param tierInfo Task tier info.
-   * @return Mesos resources.
-   */
-  public List<Protos.Resource> toResourceList(Set<Integer> selectedPorts, TierInfo tierInfo) {
-    ImmutableList.Builder<Protos.Resource> resourceBuilder =
-        ImmutableList.<Protos.Resource>builder()
-            .add(makeMesosResource(CPUS, numCpus, tierInfo.isRevocable()))
-            .add(makeMesosResource(DISK_MB, disk.as(Data.MB), false))
-            .add(makeMesosResource(RAM_MB, ram.as(Data.MB), false));
-    if (!selectedPorts.isEmpty()) {
-      resourceBuilder.add(makeMesosRangeResource(PORTS, selectedPorts));
-    }
-
-    return resourceBuilder.build();
-  }
-
-  /**
    * Ensures that the revocable setting on the executor and task CPU resources match.
    *
    * @param task Task to check for resource type alignment.
@@ -142,23 +129,26 @@ public final class ResourceSlot {
   /**
    * Convenience method for adapting to Mesos resources without applying a port range.
    *
-   * @see {@link #toResourceList(java.util.Set, TierInfo)}
    * @param tierInfo Task tier info.
    * @return Mesos resources.
    */
   public List<Protos.Resource> toResourceList(TierInfo tierInfo) {
-    return toResourceList(ImmutableSet.of(), tierInfo);
+    return ImmutableList.<Protos.Resource>builder()
+        .add(makeMesosResource(CPUS, numCpus, tierInfo.isRevocable()))
+        .add(makeMesosResource(DISK_MB, disk.as(Data.MB), false))
+        .add(makeMesosResource(RAM_MB, ram.as(Data.MB), false))
+        .build();
   }
 
   /**
    * Creates a mesos resource of integer ranges.
    *
    * @param resourceType Resource type.
-   * @param values Values to translate into ranges.
-   * @return A mesos ranges resource.
+   * @param values    Values to translate into ranges.
+   * @return A new mesos ranges resource.
    */
   @VisibleForTesting
-  static Protos.Resource makeMesosRangeResource(
+  public static Protos.Resource makeMesosRangeResource(
       ResourceType resourceType,
       Set<Integer> values) {
 
@@ -348,10 +338,4 @@ public final class ResourceSlot {
   };
 
   private static final Predicate<Integer> IS_ZERO = e -> e == 0;
-
-  private static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM =
-      input -> Protos.Value.Range.newBuilder()
-          .setBegin(input.lowerEndpoint())
-          .setEnd(input.upperEndpoint())
-          .build();
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java
index db422a9..4baf9dd 100644
--- a/src/main/java/org/apache/aurora/scheduler/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/Resources.java
@@ -62,6 +62,14 @@ public final class Resources {
    */
   public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable);
 
+  /**
+   * Convert range to set of integers.
+   */
+  public static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
+      range -> ContiguousSet.create(
+          com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()),
+          DiscreteDomain.integers());
+
   private final Iterable<Resource> mesosResources;
 
   private Resources(Iterable<Resource> mesosResources) {
@@ -145,38 +153,33 @@ public final class Resources {
   }
 
   private double getScalarValue(String key) {
-    Resource resource = getResource(key);
-    if (resource == null) {
-      return 0;
+    Iterable<Resource> resources = getResources(key);
+    double value = 0;
+    for (Resource r : resources) {
+      value += r.getScalar().getValue();
     }
-
-    return resource.getScalar().getValue();
+    return value;
   }
 
-  private Resource getResource(String key) {
-    return Iterables.find(mesosResources, e -> e.getName().equals(key), null);
+  private Iterable<Resource> getResources(String key) {
+    return Iterables.filter(mesosResources, e -> e.getName().equals(key));
   }
 
   private Iterable<Range> getPortRanges() {
-    Resource resource = getResource(PORTS.getName());
-    if (resource == null) {
-      return ImmutableList.of();
+    ImmutableList.Builder<Range> ranges = ImmutableList.builder();
+    for (Resource r : getResources(PORTS.getName())) {
+      ranges.addAll(r.getRanges().getRangeList().iterator());
     }
 
-    return resource.getRanges().getRangeList();
+    return ranges.build();
   }
 
   /**
    * Thrown when there are insufficient resources to satisfy a request.
    */
-  static class InsufficientResourcesException extends RuntimeException {
+  public static class InsufficientResourcesException extends RuntimeException {
     InsufficientResourcesException(String message) {
       super(message);
     }
   }
-
-  private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
-      range -> ContiguousSet.create(
-          com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()),
-          DiscreteDomain.integers());
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
index 2255dd4..7de8f4c 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
@@ -89,6 +89,12 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
       help = "Allows receiving revocable resource offers from Mesos.")
   private static final Arg<Boolean> RECEIVE_REVOCABLE_RESOURCES = Arg.create(false);
 
+  @CmdLine(name = "mesos_role",
+      help = "The Mesos role this framework will register as. The default is to left this empty, "
+          + "and the framework will register without any role and only receive unreserved "
+          + "resources in offer.")
+  private static final Arg<String> MESOS_ROLE = Arg.create();
+
   // TODO(wfarner): Figure out a way to change this without risk of fallout (MESOS-703).
   private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
 
@@ -99,6 +105,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
     if (FRAMEWORK_ANNOUNCE_PRINCIPAL.get() && credentials.isPresent()) {
       principal = Optional.of(credentials.get().getPrincipal());
     }
+    Optional<String> role =
+        MESOS_ROLE.hasAppliedValue() ? Optional.of(MESOS_ROLE.get()) : Optional.absent();
     DriverSettings settings = new DriverSettings(
         MESOS_MASTER_ADDRESS.get(),
         credentials,
@@ -106,7 +114,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
             EXECUTOR_USER.get(),
             principal,
             FRAMEWORK_FAILOVER_TIMEOUT.get(),
-            RECEIVE_REVOCABLE_RESOURCES.get()));
+            RECEIVE_REVOCABLE_RESOURCES.get(),
+            role));
     bind(DriverSettings.class).toInstance(settings);
   }
 
@@ -138,7 +147,8 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
       String executorUser,
       Optional<String> principal,
       Amount<Long, Time> failoverTimeout,
-      boolean revocable) {
+      boolean revocable,
+      Optional<String> role) {
 
     FrameworkInfo.Builder infoBuilder = FrameworkInfo.newBuilder()
         .setUser(executorUser)
@@ -153,6 +163,11 @@ public class CommandLineDriverSettingsModule extends AbstractModule {
     if (revocable) {
       infoBuilder.addCapabilities(Capability.newBuilder().setType(REVOCABLE_RESOURCES));
     }
+
+    if (role.isPresent()) {
+      infoBuilder.setRole(role.get());
+    }
+
     return infoBuilder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index 8fdadda..fcad0e7 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -24,7 +24,9 @@ import com.google.protobuf.ByteString;
 
 import org.apache.aurora.Protobufs;
 import org.apache.aurora.codec.ThriftBinaryCodec;
+import org.apache.aurora.scheduler.AcceptedOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.SchedulerException;
@@ -38,6 +40,7 @@ import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.ContainerInfo;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskID;
@@ -56,11 +59,11 @@ public interface MesosTaskFactory {
    * Creates a mesos task object.
    *
    * @param task Assigned task to translate into a task object.
-   * @param slaveId Id of the slave the task is being assigned to.
+   * @param offer Resource offer the task is being assigned to.
    * @return A new task.
    * @throws SchedulerException If the task could not be encoded.
    */
-  TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
+  TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException;
 
   // TODO(wfarner): Move this class to its own file to reduce visibility to package private.
   class MesosTaskFactoryImpl implements MesosTaskFactory {
@@ -95,9 +98,11 @@ public interface MesosTaskFactory {
     }
 
     @Override
-    public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
+    public TaskInfo createFrom(IAssignedTask task, Offer offer) throws SchedulerException {
       requireNonNull(task);
-      requireNonNull(slaveId);
+      requireNonNull(offer);
+
+      SlaveID slaveId = offer.getSlaveId();
 
       byte[] taskInBytes;
       try {
@@ -108,13 +113,21 @@ public interface MesosTaskFactory {
       }
 
       ITaskConfig config = task.getTask();
-
+      AcceptedOffer acceptedOffer;
       // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
-      List<Resource> resources = ResourceSlot.from(config).toResourceList(
-          task.isSetAssignedPorts()
-              ? ImmutableSet.copyOf(task.getAssignedPorts().values())
-              : ImmutableSet.of(),
-          tierManager.getTier(task.getTask()));
+      try {
+        acceptedOffer = AcceptedOffer.create(
+            offer,
+            ResourceSlot.from(config),
+            executorSettings.getExecutorOverhead(),
+            task.isSetAssignedPorts()
+                ? ImmutableSet.copyOf(task.getAssignedPorts().values())
+                : ImmutableSet.of(),
+            tierManager.getTier(task.getTask()));
+      } catch (Resources.InsufficientResourcesException e) {
+        throw new SchedulerException(e);
+      }
+      List<Resource> resources = acceptedOffer.getTaskResources();
 
       LOG.debug(
           "Setting task resources to {}",
@@ -128,9 +141,9 @@ public interface MesosTaskFactory {
               .setData(ByteString.copyFrom(taskInBytes));
 
       if (config.getContainer().isSetMesos()) {
-        configureTaskForNoContainer(task, config, taskBuilder);
+        configureTaskForNoContainer(task, config, taskBuilder, acceptedOffer);
       } else if (config.getContainer().isSetDocker()) {
-        configureTaskForDockerContainer(task, config, taskBuilder);
+        configureTaskForDockerContainer(task, config, taskBuilder, acceptedOffer);
       } else {
         throw new SchedulerException("Task had no supported container set.");
       }
@@ -141,15 +154,17 @@ public interface MesosTaskFactory {
     private void configureTaskForNoContainer(
         IAssignedTask task,
         ITaskConfig config,
-        TaskInfo.Builder taskBuilder) {
+        TaskInfo.Builder taskBuilder,
+        AcceptedOffer acceptedOffer) {
 
-      taskBuilder.setExecutor(configureTaskForExecutor(task, config).build());
+      taskBuilder.setExecutor(configureTaskForExecutor(task, config, acceptedOffer).build());
     }
 
     private void configureTaskForDockerContainer(
         IAssignedTask task,
         ITaskConfig taskConfig,
-        TaskInfo.Builder taskBuilder) {
+        TaskInfo.Builder taskBuilder,
+        AcceptedOffer acceptedOffer) {
 
       IDockerContainer config = taskConfig.getContainer().getDocker();
       Iterable<Protos.Parameter> parameters = Iterables.transform(config.getParameters(),
@@ -164,7 +179,7 @@ public interface MesosTaskFactory {
 
       configureContainerVolumes(containerBuilder);
 
-      ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, taskConfig)
+      ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, taskConfig, acceptedOffer)
           .setContainer(containerBuilder.build());
 
       taskBuilder.setExecutor(execBuilder.build());
@@ -172,11 +187,18 @@ public interface MesosTaskFactory {
 
     private ExecutorInfo.Builder configureTaskForExecutor(
         IAssignedTask task,
-        ITaskConfig config) {
+        ITaskConfig config,
+        AcceptedOffer acceptedOffer) {
 
-      return executorSettings.getExecutorConfig().getExecutor().toBuilder()
+      ExecutorInfo.Builder builder = executorSettings.getExecutorConfig().getExecutor().toBuilder()
           .setExecutorId(getExecutorId(task.getTaskId()))
           .setSource(getInstanceSourceName(config, task.getInstanceId()));
+      List<Resource> executorResources = acceptedOffer.getExecutorResources();
+      LOG.debug(
+          "Setting executor resources to {}",
+          Iterables.transform(executorResources, Protobufs::toString));
+      builder.clearResources().addAllResources(executorResources);
+      return builder;
     }
 
     private void configureContainerVolumes(ContainerInfo.Builder containerBuilder) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 7e8e456..0c467a6 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -126,7 +126,7 @@ public interface TaskAssigner {
       LOG.info(
           "Offer on slave {} (id {}) is being assigned task for {}.",
           host, offer.getSlaveId().getValue(), taskId);
-      return taskFactory.createFrom(assigned, offer.getSlaveId());
+      return taskFactory.createFrom(assigned, offer);
     }
 
     @Timed("assigner_maybe_assign")

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java b/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java
new file mode 100644
index 0000000..39096af
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/AcceptedOfferTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Data;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Resource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AcceptedOfferTest {
+
+  private static final Optional<String> TEST_ROLE = Optional.of("test-role");
+  private static final Optional<String> ABSENT_ROLE = Optional.absent();
+  private static final ResourceSlot TASK_SLOT = new ResourceSlot(
+      4, Amount.of(100L, Data.MB), Amount.of(200L, Data.MB), 0);
+  private static final ResourceSlot EXECUTOR_SLOT = new ResourceSlot(
+      0.25, Amount.of(25L, Data.MB), Amount.of(75L, Data.MB), 0);
+  private static final ResourceSlot TOTAL_SLOT = EXECUTOR_SLOT.add(TASK_SLOT);
+  private static final Integer[] TASK_PORTS = {80, 90};
+  private static final Set<Integer> TASK_PORTS_SET = ImmutableSet.copyOf(TASK_PORTS);
+
+  @Test
+  public void testReservedPredicates() {
+    Protos.Resource withRole = makeScalar(ResourceType.CPUS.getName(), TEST_ROLE, false, 1.0);
+    assertTrue(AcceptedOffer.RESERVED.apply(withRole));
+    assertFalse(AcceptedOffer.NOT_RESERVED.apply(withRole));
+    Protos.Resource absentRole = makeScalar(ResourceType.CPUS.getName(), ABSENT_ROLE, false, 1.0);
+    assertFalse(AcceptedOffer.RESERVED.apply(absentRole));
+    assertTrue(AcceptedOffer.NOT_RESERVED.apply(absentRole));
+  }
+
+  @Test
+  public void testAllocateEmpty() {
+    AcceptedOffer acceptedOffer = AcceptedOffer.create(
+        fakeOffer(Collections.emptyList()),
+        ResourceSlot.NONE,
+        ResourceSlot.NONE,
+        ImmutableSet.of(),
+        TierInfo.DEFAULT);
+    assertEquals(Collections.emptyList(), acceptedOffer.getTaskResources());
+    assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources());
+  }
+
+  @Test
+  public void testAllocateRange() {
+    List<Resource> resources = ImmutableList.<Resource>builder()
+        .add(makePortResource(Optional.absent(), 80, 81, 90, 91, 92, 93))
+        .add(makePortResource(TEST_ROLE, 100, 101))
+        .build();
+    AcceptedOffer acceptedOffer = AcceptedOffer.create(
+        fakeOffer(resources),
+        ResourceSlot.NONE,
+        ResourceSlot.NONE,
+        ImmutableSet.of(80, 90, 100),
+        TierInfo.DEFAULT);
+
+    List<Resource> expected = ImmutableList.<Resource>builder()
+        // Because we prefer reserved resources and handle them before non-reserved resources,
+        // result should have ports for the reserved resources first.
+        .add(makePortResource(TEST_ROLE, 100))
+        .add(makePortResource(Optional.absent(), 80, 90))
+        .build();
+    assertEquals(expected, acceptedOffer.getTaskResources());
+    assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources());
+  }
+
+  @Test(expected = Resources.InsufficientResourcesException.class)
+  public void testAllocateRangeInsufficent() {
+    List<Resource> resources = ImmutableList.of(
+        makePortResource(ABSENT_ROLE, 80),
+        makePortResource(ABSENT_ROLE, 100, 101));
+    AcceptedOffer.create(
+        fakeOffer(resources),
+        ResourceSlot.NONE,
+        ResourceSlot.NONE,
+        ImmutableSet.of(80, 90, 100),
+        TierInfo.DEFAULT);
+  }
+
+  @Test
+  public void testAllocateSingleRole() {
+    runAllocateSingleRole(ABSENT_ROLE, false);
+    runAllocateSingleRole(ABSENT_ROLE, true);
+    runAllocateSingleRole(TEST_ROLE, false);
+    runAllocateSingleRole(TEST_ROLE, true);
+  }
+
+  private void runAllocateSingleRole(Optional<String> role, boolean cpuRevocable) {
+    List<Resource> resources = ImmutableList.<Resource>builder()
+        .add(makeScalar(
+            ResourceType.CPUS.getName(), role, cpuRevocable, TOTAL_SLOT.getNumCpus()))
+        .add(makeScalar(
+            ResourceType.RAM_MB.getName(), role, false, TOTAL_SLOT.getRam().as(Data.MB)))
+        .add(makeScalar(
+            ResourceType.DISK_MB.getName(), role, false, TOTAL_SLOT.getDisk().as(Data.MB)))
+        .add(makePortResource(role, TASK_PORTS))
+        .build();
+    Protos.Offer offer = fakeOffer(resources);
+
+    AcceptedOffer offerAllocation = AcceptedOffer.create(
+        offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(cpuRevocable));
+
+    List<Resource> taskList = ImmutableList.<Resource>builder()
+        .add(makeScalar(ResourceType.CPUS.getName(), role, cpuRevocable, TASK_SLOT.getNumCpus()))
+        .add(makeScalar(ResourceType.RAM_MB.getName(), role, false, TASK_SLOT.getRam().as(Data.MB)))
+        .add(makeScalar(
+            ResourceType.DISK_MB.getName(), role, false, TASK_SLOT.getDisk().as(Data.MB)))
+        .add(makePortResource(role, TASK_PORTS))
+        .build();
+    assertEquals(taskList, offerAllocation.getTaskResources());
+
+    List<Resource> executorList = ImmutableList.<Resource>builder()
+        .add(makeScalar(
+            ResourceType.CPUS.getName(), role, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+        .add(makeScalar(
+            ResourceType.RAM_MB.getName(), role, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
+        .add(makeScalar(
+            ResourceType.DISK_MB.getName(), role, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+        .build();
+    assertEquals(executorList, offerAllocation.getExecutorResources());
+  }
+
+  @Test(expected = Resources.InsufficientResourcesException.class)
+  public void testAllocateSingleRoleInsufficient() {
+    List<Resource> resources = ImmutableList.<Resource>builder()
+        // EXECUTOR_SLOT's CPU is not included here.
+        .add(makeScalar(ResourceType.CPUS.getName(), TEST_ROLE, false, TASK_SLOT.getNumCpus()))
+        .add(makeScalar(
+            ResourceType.RAM_MB.getName(), TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)))
+        .add(makeScalar(
+            ResourceType.DISK_MB.getName(), TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)))
+        .add(makePortResource(TEST_ROLE, TASK_PORTS))
+        .build();
+    Protos.Offer offer = fakeOffer(resources);
+
+    AcceptedOffer.create(
+        offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false));
+  }
+
+  @Test
+  public void testMultipleRoles() {
+    runMultipleRoles(false);
+    runMultipleRoles(true);
+  }
+
+  private void runMultipleRoles(boolean cpuRevocable) {
+    List<Resource> resources = ImmutableList.<Resource>builder()
+        // Make cpus come from two roles.
+        .add(makeScalar(
+            ResourceType.CPUS.getName(),
+            TEST_ROLE,
+            cpuRevocable,
+            EXECUTOR_SLOT.getNumCpus()))
+        .add(makeScalar(
+            ResourceType.CPUS.getName(),
+            ABSENT_ROLE,
+            cpuRevocable,
+            TASK_SLOT.getNumCpus()))
+        // Make ram come from default role
+        .add(makeScalar(
+            ResourceType.RAM_MB.getName(),
+            ABSENT_ROLE,
+            false,
+            TOTAL_SLOT.getRam().as(Data.MB)))
+        // Make disk come from non-default role.
+        .add(makeScalar(
+            ResourceType.DISK_MB.getName(),
+            TEST_ROLE,
+            false,
+            TOTAL_SLOT.getDisk().as(Data.MB)))
+        .add(makePortResource(TEST_ROLE, 80))
+        .add(makePortResource(ABSENT_ROLE, 90))
+        .build();
+
+    Protos.Offer offer = fakeOffer(resources);
+
+    AcceptedOffer offerAllocation = AcceptedOffer.create(
+        offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(cpuRevocable));
+
+    List<Resource> taskList = ImmutableList.<Resource>builder()
+        // We intentionally sliced the offer resource to not align with TASK_SLOT's num cpus.
+        .add(makeScalar(
+            ResourceType.CPUS.getName(), TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+        .add(makeScalar(
+            ResourceType.CPUS.getName(),
+            ABSENT_ROLE,
+            cpuRevocable,
+            TASK_SLOT.subtract(EXECUTOR_SLOT).getNumCpus()))
+        .add(makeScalar(
+            ResourceType.RAM_MB.getName(), ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB)))
+        .add(makeScalar(
+            ResourceType.DISK_MB.getName(), TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB)))
+        .add(makePortResource(TEST_ROLE, 80))
+        .add(makePortResource(ABSENT_ROLE, 90))
+        .build();
+    assertEquals(taskList, offerAllocation.getTaskResources());
+
+    List<Resource> executorList = ImmutableList.<Resource>builder()
+        .add(makeScalar(
+            ResourceType.CPUS.getName(), ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()))
+        .add(makeScalar(
+            ResourceType.RAM_MB.getName(), ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB)))
+        .add(makeScalar(
+            ResourceType.DISK_MB.getName(), TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB)))
+        .build();
+    assertEquals(executorList, offerAllocation.getExecutorResources());
+  }
+
+  @Test(expected = Resources.InsufficientResourcesException.class)
+  public void testMultipleRolesInsufficient() {
+    // Similar to testMultipleRoles, but make some of cpus as revocable
+    List<Resource> resources = ImmutableList.<Resource>builder()
+        // Make cpus come from two roles.
+        .add(makeScalar(
+            ResourceType.CPUS.getName(),
+            TEST_ROLE,
+            true,
+            EXECUTOR_SLOT.getNumCpus()))
+        .add(makeScalar(
+            ResourceType.CPUS.getName(),
+            ABSENT_ROLE,
+            false,
+            TASK_SLOT.getNumCpus()))
+        // Make ram come from default role
+        .add(makeScalar(
+            ResourceType.RAM_MB.getName(),
+            ABSENT_ROLE,
+            false,
+            TOTAL_SLOT.getRam().as(Data.MB)))
+        // Make disk come from non-default role.
+        .add(makeScalar(
+            ResourceType.DISK_MB.getName(),
+            TEST_ROLE,
+            false,
+            TOTAL_SLOT.getDisk().as(Data.MB)))
+        .add(makePortResource(TEST_ROLE, 80))
+        .add(makePortResource(ABSENT_ROLE, 90))
+        .build();
+    Protos.Offer offer = fakeOffer(resources);
+    // We don't have enough resource to satisfy a non-revocable request.
+    AcceptedOffer.create(
+        offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false));
+  }
+
+  private static Resource makePortResource(Optional<String> role, Integer... values) {
+    Resource.Builder prototype = Resource.newBuilder()
+        .setType(Protos.Value.Type.RANGES)
+        .setName(ResourceType.PORTS.getName());
+    if (role.isPresent()) {
+      prototype.setRole(role.get());
+    }
+    return AcceptedOffer.makeMesosRangeResource(prototype.build(), ImmutableSet.copyOf(values));
+  }
+
+  private static Resource makeScalar(
+      String name, Optional<String> role, boolean revocable, double value) {
+    Resource.Builder resource = Resource.newBuilder()
+        .setName(name)
+        .setType(Protos.Value.Type.SCALAR)
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(value));
+    if (role.isPresent()) {
+      resource.setRole(role.get());
+    }
+    if (revocable) {
+      resource.setRevocable(Resource.RevocableInfo.getDefaultInstance());
+    }
+    return resource.build();
+  }
+
+  private static Protos.Offer fakeOffer(List<Resource> resources) {
+    return Protos.Offer.newBuilder()
+        .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
+        .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
+        .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
+        .setHostname("hostname")
+        .addAllResources(resources)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
index e4ae943..52113b8 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
@@ -88,29 +88,25 @@ public class ResourceSlotTest {
   }
 
   @Test
-  public void testToResourceListNoRevocable() {
+  public void testToResourceListNoRevoca() {
     ResourceSlot resources = ResourceSlot.from(TASK);
-    Set<Integer> ports = ImmutableSet.of(80, 443);
     assertEquals(
         ImmutableSet.of(
             makeMesosResource(CPUS, TASK.getNumCpus(), false),
             makeMesosResource(RAM_MB, TASK.getRamMb(), false),
-            makeMesosResource(DISK_MB, TASK.getDiskMb(), false),
-            makeMesosRangeResource(PORTS, ports)),
-        ImmutableSet.copyOf(resources.toResourceList(ports, DEFAULT)));
+            makeMesosResource(DISK_MB, TASK.getDiskMb(), false)),
+        ImmutableSet.copyOf(resources.toResourceList(DEFAULT)));
   }
 
   @Test
   public void testToResourceListRevocable() {
     ResourceSlot resources = ResourceSlot.from(TASK);
-    Set<Integer> ports = ImmutableSet.of(80, 443);
     assertEquals(
         ImmutableSet.of(
             makeMesosResource(CPUS, TASK.getNumCpus(), true),
             makeMesosResource(RAM_MB, TASK.getRamMb(), false),
-            makeMesosResource(DISK_MB, TASK.getDiskMb(), false),
-            makeMesosRangeResource(PORTS, ports)),
-        ImmutableSet.copyOf(resources.toResourceList(ports, REVOCABLE_TIER)));
+            makeMesosResource(DISK_MB, TASK.getDiskMb(), false)),
+        ImmutableSet.copyOf(resources.toResourceList(REVOCABLE_TIER)));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
index 33149ab..dc964b8 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
@@ -28,9 +28,13 @@ import org.junit.Test;
 
 import static org.apache.mesos.Protos.FrameworkInfo.Capability.Type.REVOCABLE_RESOURCES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class CommandLineDriverSettingsModuleTest {
 
+  private static final String TEST_ROLE = "test-role";
+
   @Test(expected = IllegalStateException.class)
   public void testMissingPropertiesParsing() {
     Properties testProperties = new Properties();
@@ -72,9 +76,11 @@ public class CommandLineDriverSettingsModuleTest {
         "user",
         Optional.absent(),
         Amount.of(1L, Time.MINUTES),
-        false);
+        false,
+        Optional.absent());
     assertEquals("", info.getPrincipal());
     assertEquals(0, info.getCapabilitiesCount());
+    assertFalse(info.hasRole());
   }
 
   @Test
@@ -83,10 +89,12 @@ public class CommandLineDriverSettingsModuleTest {
         "user",
         Optional.absent(),
         Amount.of(1L, Time.MINUTES),
-        true);
+        true,
+        Optional.absent());
     assertEquals("", info.getPrincipal());
     assertEquals(1, info.getCapabilitiesCount());
     assertEquals(REVOCABLE_RESOURCES, info.getCapabilities(0).getType());
+    assertFalse(info.hasRole());
   }
 
   @Test
@@ -95,20 +103,25 @@ public class CommandLineDriverSettingsModuleTest {
         "user",
         Optional.of("auroraprincipal"),
         Amount.of(1L, Time.MINUTES),
-        false);
+        false,
+        Optional.absent());
     assertEquals("auroraprincipal", info.getPrincipal());
     assertEquals(0, info.getCapabilitiesCount());
+    assertFalse(info.hasRole());
   }
 
   @Test
-  public void testFrameworkInfoRevocableWithAnnouncedPrincipal() {
+  public void testFrameworkInfoRevocableWithAnnouncedPrincipalAndRole() {
     Protos.FrameworkInfo info = CommandLineDriverSettingsModule.buildFrameworkInfo(
         "user",
             Optional.of("auroraprincipal"),
         Amount.of(1L, Time.MINUTES),
-        true);
+        true,
+        Optional.of(TEST_ROLE));
     assertEquals("auroraprincipal", info.getPrincipal());
     assertEquals(1, info.getCapabilitiesCount());
     assertEquals(REVOCABLE_RESOURCES, info.getCapabilities(0).getType());
+    assertTrue(info.hasRole());
+    assertEquals(TEST_ROLE, info.getRole());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index a5793bf..066c6a3 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
+import java.util.stream.Collectors;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -27,6 +29,7 @@ import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.MesosContainer;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.ResourceType;
 import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
@@ -37,6 +40,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.ContainerInfo.DockerInfo;
 import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.Parameter;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.SlaveID;
@@ -46,9 +50,11 @@ import org.apache.mesos.Protos.Volume.Mode;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.ResourceSlot.makeMesosRangeResource;
 import static org.apache.aurora.scheduler.TierInfo.DEFAULT;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER;
 import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
+import static org.apache.aurora.scheduler.mesos.TaskExecutors.SOME_OVERHEAD_EXECUTOR;
 import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_CONFIG;
 import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR;
 import static org.easymock.EasyMock.expect;
@@ -85,6 +91,23 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
                       ImmutableList.of(new DockerParameter("label", "testparameter")))))));
 
   private static final SlaveID SLAVE = SlaveID.newBuilder().setValue("slave-id").build();
+  private static final Offer OFFER_THERMOS_EXECUTOR = Protos.Offer.newBuilder()
+      .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
+      .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
+      .setSlaveId(SLAVE)
+      .setHostname("slave-hostname")
+      .addAllResources(
+          ResourceSlot.from(TASK_CONFIG).add(THERMOS_EXECUTOR.getExecutorOverhead())
+              .toResourceList(DEFAULT))
+      .addResources(makeMesosRangeResource(ResourceType.PORTS, ImmutableSet.of(80)))
+      .build();
+  private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder()
+      .clearResources()
+      .addAllResources(
+              ResourceSlot.from(TASK_CONFIG).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead())
+                      .toResourceList(DEFAULT))
+      .addResources(makeMesosRangeResource(ResourceType.PORTS, ImmutableSet.of(80)))
+      .build();
 
   private MesosTaskFactory taskFactory;
   private ExecutorSettings config;
@@ -106,6 +129,18 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
         .build();
   }
 
+  private static ExecutorInfo purgeZeroResources(ExecutorInfo executor) {
+    return executor.toBuilder()
+        .clearResources()
+        .addAllResources(
+            executor.getResourcesList()
+                .stream()
+                .filter(
+                    e -> !e.hasScalar() || e.getScalar().getValue() > 0)
+                .collect(Collectors.toList()))
+        .build();
+  }
+
   @Test
   public void testExecutorInfoUnchanged() {
     expect(tierManager.getTier(TASK_CONFIG)).andReturn(DEFAULT);
@@ -113,7 +148,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
     control.replay();
 
-    TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
+    TaskInfo task = taskFactory.createFrom(TASK, OFFER_THERMOS_EXECUTOR);
 
     assertEquals(populateDynamicFields(DEFAULT_EXECUTOR, TASK), task.getExecutor());
     checkTaskResources(TASK.getTask(), task);
@@ -124,9 +159,17 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
     expect(tierManager.getTier(TASK_CONFIG)).andReturn(REVOCABLE_TIER);
     taskFactory = new MesosTaskFactoryImpl(config, tierManager);
 
+    Resource revocableCPU = OFFER_THERMOS_EXECUTOR.getResources(0).toBuilder()
+        .setRevocable(Resource.RevocableInfo.getDefaultInstance())
+        .build();
+    Offer withRevocable = OFFER_THERMOS_EXECUTOR.toBuilder()
+        .removeResources(0)
+        .addResources(0, revocableCPU)
+        .build();
+
     control.replay();
 
-    TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
+    TaskInfo task = taskFactory.createFrom(TASK, withRevocable);
     checkTaskResources(TASK.getTask(), task);
     assertTrue(task.getResourcesList().stream().anyMatch(Resource::hasRevocable));
   }
@@ -142,7 +185,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
     control.replay();
 
-    TaskInfo task = taskFactory.createFrom(IAssignedTask.build(builder), SLAVE);
+    TaskInfo task = taskFactory.createFrom(IAssignedTask.build(builder), OFFER_THERMOS_EXECUTOR);
     checkTaskResources(ITaskConfig.build(builder.getTask()), task);
   }
 
@@ -156,9 +199,10 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
     control.replay();
 
-    TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
+    TaskInfo task = taskFactory.createFrom(TASK, OFFER_THERMOS_EXECUTOR);
     assertEquals(
-        populateDynamicFields(NO_OVERHEAD_EXECUTOR.getExecutorConfig().getExecutor(), TASK),
+        purgeZeroResources(populateDynamicFields(
+            NO_OVERHEAD_EXECUTOR.getExecutorConfig().getExecutor(), TASK)),
         task.getExecutor());
 
     // Simulate the upsizing needed for the task to meet the minimum thermos requirements.
@@ -177,13 +221,14 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
   }
 
   private TaskInfo getDockerTaskInfo(IAssignedTask task) {
-    config = TaskExecutors.SOME_OVERHEAD_EXECUTOR;
+    config = SOME_OVERHEAD_EXECUTOR;
+
     expect(tierManager.getTier(task.getTask())).andReturn(DEFAULT);
     taskFactory = new MesosTaskFactoryImpl(config, tierManager);
 
     control.replay();
 
-    return taskFactory.createFrom(task, SLAVE);
+    return taskFactory.createFrom(task, OFFER_SOME_OVERHEAD_EXECUTOR);
   }
 
   @Test
@@ -217,7 +262,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
     control.replay();
 
-    TaskInfo taskInfo = taskFactory.createFrom(TASK_WITH_DOCKER, SLAVE);
+    TaskInfo taskInfo = taskFactory.createFrom(TASK_WITH_DOCKER, OFFER_THERMOS_EXECUTOR);
     assertEquals(
         config.getExecutorConfig().getVolumeMounts(),
         taskInfo.getExecutor().getContainer().getVolumesList());

http://git-wip-us.apache.org/repos/asf/aurora/blob/a80260ea/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 3cbe9ac..b00add0 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -135,7 +135,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         MESOS_OFFER.getSlaveId(),
         ImmutableMap.of(PORT_NAME, PORT)))
         .andReturn(TASK.getAssignedTask());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId()))
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
         .andReturn(TASK_INFO);
 
     control.replay();
@@ -204,7 +204,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         LOST,
         LAUNCH_FAILED_MSG))
         .andReturn(StateChangeResult.SUCCESS);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId()))
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
         .andReturn(TASK_INFO);
 
     control.replay();
@@ -261,7 +261,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         offer.getOffer().getSlaveId(),
         ImmutableMap.of(PORT_NAME, PORT)))
         .andReturn(TASK.getAssignedTask());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer().getSlaveId()))
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer()))
         .andReturn(TASK_INFO);
     offerManager.launchTask(offer.getOffer().getId(), TASK_INFO);
 
@@ -313,7 +313,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         OFFER.getOffer().getSlaveId(),
         ImmutableMap.of(PORT_NAME, PORT)))
         .andReturn(TASK.getAssignedTask());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer().getSlaveId()))
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer()))
         .andReturn(TASK_INFO);
     offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);