You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by jc...@apache.org on 2016/08/04 15:30:21 UTC

aurora git commit: Multiple executor support in Scheduler

Repository: aurora
Updated Branches:
  refs/heads/master 0105a151b -> d0533d2c7


Multiple executor support in Scheduler

Adds support for using multiple executors in a single scheduler.

Reviewed at https://reviews.apache.org/r/50480/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d0533d2c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d0533d2c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d0533d2c

Branch: refs/heads/master
Commit: d0533d2c7ac15a19cc63587481a75b9597613425
Parents: 0105a15
Author: Renan DelValle <rd...@binghamton.edu>
Authored: Thu Aug 4 09:24:32 2016 -0500
Committer: Joshua Cohen <jc...@apache.org>
Committed: Thu Aug 4 09:24:32 2016 -0500

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   6 +-
 docs/operations/configuration.md                | 159 +++++++++++--------
 src/main/java/org/apache/aurora/GuavaUtils.java |  16 ++
 .../aurora/scheduler/base/TaskTestUtil.java     |  28 +++-
 .../configuration/ConfigurationManager.java     |  28 +++-
 .../configuration/executor/ExecutorConfig.java  |  17 +-
 .../configuration/executor/ExecutorModule.java  |  65 ++++----
 .../executor/ExecutorSettings.java              |  24 ++-
 .../executor/ExecutorSettingsLoader.java        |  35 ++--
 .../scheduler/mesos/MesosTaskFactory.java       |  57 +++++--
 .../scheduler/mesos/TestExecutorSettings.java   |  23 ++-
 .../preemptor/PreemptionVictimFilter.java       |  18 ++-
 .../scheduler/scheduling/TaskScheduler.java     |  12 +-
 .../configuration/ConfigurationManagerTest.java |  16 +-
 .../executor/ExecutorSettingsLoaderTest.java    |  69 ++++++--
 .../filter/SchedulingFilterImplTest.java        |   6 +-
 .../mesos/MesosTaskFactoryImplTest.java         |  57 ++++---
 .../preemptor/PreemptionVictimFilterTest.java   |   5 +-
 .../scheduling/TaskSchedulerImplTest.java       |   5 +-
 .../aurora/scheduler/thrift/Fixtures.java       |   3 +-
 .../thrift/SchedulerThriftInterfaceTest.java    |  24 +--
 .../executor/test-missing-field.json            |   8 +-
 .../executor/test-multiple-executor.json        | 100 ++++++++++++
 .../executor/test-single-executor.json          |  51 ++++++
 .../executor/test-thermos-executor.json         |  48 ------
 25 files changed, 630 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index d14c3d7..8c6b860 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -17,12 +17,16 @@
   from Mesos. This has affected rendering of some of the existing attributes. Furthermore, it now
   dumps additional offer attributes including [reservations](http://mesos.apache.org/documentation/latest/reservation/)
   and [persistent volumes](http://mesos.apache.org/documentation/latest/persistent-volume/).
-* The scheduler API now accepts both thrift JSON and binary thrift. If a request is sent with a
+- The scheduler API now accepts both thrift JSON and binary thrift. If a request is sent with a
   `Content-Type` header, or a `Content-Type` header of `application/x-thrift` or `application/json`
   or `application/vnd.apache.thrift.json` the request is treated as thrift JSON. If a request is
   sent with a `Content-Type` header of `application/vnd.apache.thrift.binary` the request is treated
   as binary thrift. If the `Accept` header of the request is `application/vnd.apache.thrift.binary`
   then the response will be binary thrift. Any other value for `Accept` will result in thrift JSON.
+- Scheduler is now able to launch jobs using more than one executor at a time. To use this feature
+  the `-custom_executor_config` flag must point to a JSON file which contains at least one valid
+  executor configuration as detailed in the [configuration](http://aurora.apache.org/documentation/latest/operations/configuration/)
+  documentation.
 
 ### Deprecations and removals:
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/docs/operations/configuration.md
----------------------------------------------------------------------
diff --git a/docs/operations/configuration.md b/docs/operations/configuration.md
index 0615c54..bbcffac 100644
--- a/docs/operations/configuration.md
+++ b/docs/operations/configuration.md
@@ -142,16 +142,24 @@ If you need to do computation before starting the thermos executor (for example,
 For example, to wrap the executor inside a simple wrapper, the scheduler will be started like this
 `-thermos_executor_path=/path/to/wrapper.sh -thermos_executor_resources=/usr/share/aurora/bin/thermos_executor.pex`
 
-## Custom Executor\u2028\u2028
+## Custom Executors
 
 If the need arises to use a Mesos executor other than the Thermos executor, the scheduler can be
 configured to utilize a custom executor by specifying the `-custom_executor_config` flag.
-The flag must be set to the path of a valid executor configuration file.\u2028
+The flag must be set to the path of a valid executor configuration file.
 
-The configuration file must be valid JSON and contain, at minimum, the name, command and resources fields.
+The configuration file must be a valid **JSON array** and contain, at minimum,
+one executor configuration including the name, command and resources fields.
 
+### Array Entry
 
-### executor
+Property                 | Description
+-----------------------  | ---------------------------------
+executor (required)      | Description of executor.
+task_prefix (required) ) | Prefix given to tasks launched with this executor's configuration.
+volume_mounts (optional) | Volumes to be mounted in container running executor.
+
+#### executor
 
 Property                 | Description
 -----------------------  | ---------------------------------
@@ -166,6 +174,20 @@ Property                 | Description
 value (required)         | The command to execute.
 arguments (optional)     | A list of arguments to pass to the command.
 uris (optional)          | List of resources to download into the task sandbox.
+shell (optional)         | Run executor via shell.
+
+A note on the command property (from [mesos.proto](https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto)):
+```
+1) If 'shell == true', the command will be launched via shell
+   (i.e., /bin/sh -c 'value'). The 'value' specified will be
+   treated as the shell command. The 'arguments' will be ignored.
+2) If 'shell == false', the command will be launched by passing
+   arguments to an executable. The 'value' specified will be
+   treated as the filename of the executable. The 'arguments'
+   will be treated as the arguments to the executable. This is
+   similar to how POSIX exec families launch processes (i.e.,
+   execlp(value, arguments(0), arguments(1), ...)).
+```
 
 ##### uris (list)
 * Follows the [Mesos Fetcher schema](http://mesos.apache.org/documentation/latest/fetcher/)
@@ -193,73 +215,80 @@ host_path (required)      | Host path to mount inside the container.
 container_path (required) | Path inside the container where `host_path` will be mounted.
 mode (required)           | Mode in which to mount the volume, Read-Write (RW) or Read-Only (RO).
 
-
-A sample configuration is as follows:\u2028
+A sample configuration is as follows:
 ```
-     {
-       "executor": {
-         "name": "myExecutor",
-         "command": {
-           "value": "myExecutor.sh",
-           "arguments": [
-             "localhost:2181",
-             "-verbose",
-             "-config myConfiguration.config"
-           ],
-           "uris": [
-             {
-               "value": "/dist/myExecutor.sh",
-               "executable": true,
-               "extract": false,
-               "cache": true
-             },
-             {
-               "value": "/home/user/myConfiguration.config",
-               "executable": false,
-               "extract": false,
-               "cache": false
-             }
-           ]
-         },
-         "resources": [
-           {
-             "name": "cpus",
-             "type": "SCALAR",
-             "scalar": {
-               "value": 1.00
-             }
-           },
-           {
-             "name": "mem",
-             "type": "SCALAR",
-             "scalar": {
-               "value": 512
-             }
-           }
-         ]
-       },
-       "volume_mounts": [
-         {
-           "mode": "RO",
-           "container_path": "/path/on/container",
-           "host_path": "/path/to/host/directory"
-         },
-         {
-           "mode": "RW",
-           "container_path": "/container",
-           "host_path": "/host"
-         }
-       ]
-     }
+[
+    {
+      "executor": {
+        "name": "myExecutor",
+        "command": {
+          "value": "myExecutor.a",
+          "shell": "false",
+          "arguments": [
+            "localhost:2181",
+            "-verbose",
+            "-config myConfiguration.config"
+          ],
+          "uris": [
+            {
+              "value": "/dist/myExecutor.a",
+              "executable": true,
+              "extract": false,
+              "cache": true
+            },
+            {
+              "value": "/home/user/myConfiguration.config",
+              "executable": false,
+              "extract": false,
+              "cache": false
+            }
+          ]
+        },
+        "resources": [
+          {
+            "name": "cpus",
+            "type": "SCALAR",
+            "scalar": {
+              "value": 1.00
+            }
+          },
+          {
+            "name": "mem",
+            "type": "SCALAR",
+            "scalar": {
+              "value": 512
+            }
+          }
+        ]
+      },
+      "volume_mounts": [
+        {
+          "mode": "RO",
+          "container_path": "/path/on/container",
+          "host_path": "/path/to/host/directory"
+        },
+        {
+          "mode": "RW",
+          "container_path": "/container",
+          "host_path": "/host"
+        }
+      ],
+      "task_prefix": "my-executor-"
+    }
+]
+
 ```
 
 It should be noted that if you do not use thermos or a thermos based executor, links in the scheduler's
-Web UI for tasks\u2028 will not work (at least for the time being).
+Web UI for tasks will not work (at least for the time being).
 Some information about launched tasks can still be accessed via the Mesos Web UI or via the Aurora Client.
-Furthermore, this configuration replaces the default thermos executor.
-Work is in progress to allow support for multiple executors to co-exist within a single scheduler.
 
-### Docker containers
+### A note on increasing executor overhead
+
+Increasing executor overhead on an existing cluster, whether it be for custom executors or for thermos,
+will result in degraded preemption performance until all task with lesser overhead are preempted/restarted.
+
+## Docker containers
 In order for Aurora to launch jobs using docker containers, a few extra configuration options
 must be set.  The [docker containerizer](http://mesos.apache.org/documentation/latest/docker-containerizer/)
 must be enabled on the Mesos agents by launching them with the `--containerizers=docker,mesos` option.

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/GuavaUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/GuavaUtils.java b/src/main/java/org/apache/aurora/GuavaUtils.java
index 8c2ab57..9d2c43f 100644
--- a/src/main/java/org/apache/aurora/GuavaUtils.java
+++ b/src/main/java/org/apache/aurora/GuavaUtils.java
@@ -15,9 +15,11 @@ package org.apache.aurora;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 import java.util.stream.Collector;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Service;
@@ -79,6 +81,20 @@ public final class GuavaUtils {
   }
 
   /**
+   * Collector to create a Guava ImmutableMap.
+   */
+  public static <T, K, V> Collector<T, ?, ImmutableMap<K, V>> toImmutableMap(
+      Function<? super T, ? extends K> keyMapper,
+      Function<? super T, ? extends V> valueMapper) {
+    return Collector.of(
+        ImmutableMap.Builder<K, V>::new,
+        (r, t) -> r.put(keyMapper.apply(t), valueMapper.apply(t)),
+        (l, r) -> l.putAll(r.build()),
+        ImmutableMap.Builder::build,
+        UNORDERED);
+  }
+
+  /**
    * Interface for mocking. The Guava ServiceManager class is final.
    */
   public interface ServiceManagerIface {

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index c43e04a..3bd22a0 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -27,7 +27,6 @@ import org.apache.aurora.gen.Container;
 import org.apache.aurora.gen.Container._Fields;
 import org.apache.aurora.gen.DockerContainer;
 import org.apache.aurora.gen.DockerParameter;
-import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.MesosFetcherURI;
@@ -39,15 +38,21 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.gen.ValueConstraint;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.TierInfo;
 import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.TierManager.TierManagerImpl.TierConfig;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
 
 /**
  * Convenience methods for working with tasks.
@@ -81,8 +86,23 @@ public final class TaskTestUtil {
           true,
           true,
           true);
+  public static final ExecutorID EXECUTOR_ID = ExecutorID.newBuilder()
+      .setValue("PLACEHOLDER")
+      .build();
+  public static final ExecutorInfo EXECUTOR_INFO = ExecutorInfo.newBuilder()
+      .setExecutorId(EXECUTOR_ID)
+      .setName(apiConstants.AURORA_EXECUTOR_NAME)
+      .setCommand(Protos.CommandInfo.newBuilder().build()).build();
+  public static final ExecutorSettings EXECUTOR_SETTINGS = new ExecutorSettings(
+      ImmutableMap.<String, ExecutorConfig>builder()
+          .put(EXECUTOR_INFO.getName(), new ExecutorConfig(EXECUTOR_INFO, ImmutableList.of(), ""))
+          .build(),
+      false);
   public static final ConfigurationManager CONFIGURATION_MANAGER =
-      new ConfigurationManager(CONFIGURATION_MANAGER_SETTINGS, TIER_MANAGER, THRIFT_BACKFILL);
+      new ConfigurationManager(CONFIGURATION_MANAGER_SETTINGS,
+          TIER_MANAGER,
+          THRIFT_BACKFILL,
+          EXECUTOR_SETTINGS);
 
   private TaskTestUtil() {
     // Utility class.
@@ -115,7 +135,9 @@ public final class TaskTestUtil {
         .setMesosFetcherUris(ImmutableSet.of(
             new MesosFetcherURI("pathA").setExtract(true).setCache(true),
             new MesosFetcherURI("pathB").setExtract(true).setCache(true)))
-        .setExecutorConfig(new ExecutorConfig("name", "config"))
+        .setExecutorConfig(new org.apache.aurora.gen.ExecutorConfig(
+            EXECUTOR_INFO.getName(),
+            "config"))
         .setContainer(Container.docker(
             new DockerContainer("imagename")
                 .setParameters(ImmutableList.of(

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 93ed360..701f79c 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -38,6 +38,7 @@ import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.UserProvidedStrings;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
@@ -138,16 +139,19 @@ public class ConfigurationManager {
   private final ConfigurationManagerSettings settings;
   private final TierManager tierManager;
   private final ThriftBackfill thriftBackfill;
+  private final ExecutorSettings executorSettings;
 
   @Inject
   public ConfigurationManager(
       ConfigurationManagerSettings settings,
       TierManager tierManager,
-      ThriftBackfill thriftBackfill) {
+      ThriftBackfill thriftBackfill,
+      ExecutorSettings executorSettings) {
 
     this.settings = requireNonNull(settings);
     this.tierManager = requireNonNull(tierManager);
     this.thriftBackfill = requireNonNull(thriftBackfill);
+    this.executorSettings = requireNonNull(executorSettings);
   }
 
   private static String getRole(IValueConstraint constraint) {
@@ -223,6 +227,13 @@ public class ConfigurationManager {
   @VisibleForTesting
   static final String MESOS_FETCHER_DISABLED =
       "Mesos Fetcher for individual jobs is disabled in this cluster.";
+
+  @VisibleForTesting
+  public static final String NO_EXECUTOR_OR_CONTAINER = "Configuration may not be null.";
+
+  @VisibleForTesting
+  static final String INVALID_EXECUTOR_CONFIG = "Executor name may not be left unset.";
+
   /**
    * Check validity of and populates defaults in a task configuration.  This will return a deep copy
    * of the provided task configuration with default configuration values applied, and configuration
@@ -259,7 +270,20 @@ public class ConfigurationManager {
     if (!builder.isSetExecutorConfig()
         && !(builder.isSetContainer() && builder.getContainer().isSetDocker())) {
 
-      throw new TaskDescriptionException("Configuration may not be null");
+      throw new TaskDescriptionException(NO_EXECUTOR_OR_CONTAINER);
+    }
+
+    // Docker containers don't require executors, validate the rest
+    if (builder.isSetExecutorConfig()) {
+
+      if (!builder.getExecutorConfig().isSetName())  {
+        throw new TaskDescriptionException(INVALID_EXECUTOR_CONFIG);
+      }
+
+      executorSettings.getExecutorConfig(builder.getExecutorConfig().getName()).orElseThrow(
+          () -> new TaskDescriptionException("Configuration for executor '"
+              + builder.getExecutorConfig().getName()
+              + "' doesn't exist."));
     }
 
     // Maximize the usefulness of any thrown error message by checking required fields first.

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorConfig.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorConfig.java
index 24329a4..32bafb2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorConfig.java
@@ -30,10 +30,15 @@ public class ExecutorConfig {
 
   private final ExecutorInfo executor;
   private final List<Volume> volumeMounts;
+  private final String taskPrefix;
 
-  public ExecutorConfig(ExecutorInfo executor, List<Volume> volumeMounts) {
+  public ExecutorConfig(
+      ExecutorInfo executor,
+      List<Volume> volumeMounts,
+      String taskPrefix) {
     this.executor = requireNonNull(executor);
     this.volumeMounts = requireNonNull(volumeMounts);
+    this.taskPrefix = requireNonNull(taskPrefix);
   }
 
   public ExecutorInfo getExecutor() {
@@ -44,6 +49,10 @@ public class ExecutorConfig {
     return volumeMounts;
   }
 
+  public String getTaskPrefix() {
+    return taskPrefix;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (!(obj instanceof ExecutorConfig)) {
@@ -52,12 +61,13 @@ public class ExecutorConfig {
 
     ExecutorConfig other = (ExecutorConfig) obj;
     return Objects.equals(executor, other.executor)
-        && Objects.equals(volumeMounts, other.volumeMounts);
+        && Objects.equals(volumeMounts, other.volumeMounts)
+        && Objects.equals(taskPrefix, other.taskPrefix);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(executor, volumeMounts);
+    return Objects.hash(executor, volumeMounts, taskPrefix);
   }
 
   @Override
@@ -65,6 +75,7 @@ public class ExecutorConfig {
     return MoreObjects.toStringHelper(this)
         .add("executor", executor)
         .add("volumeMounts", volumeMounts)
+        .add("taskPrefix", taskPrefix)
         .toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java
index 8b7f8dc..0d6a8c9 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java
@@ -23,6 +23,7 @@ import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.inject.AbstractModule;
 
@@ -35,6 +36,7 @@ import org.apache.aurora.common.base.MorePreconditions;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.gen.Volume;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.CommandInfo;
@@ -133,7 +135,7 @@ public class ExecutorModule extends AbstractModule {
         .build();
   }
 
-  private static ExecutorSettings makeThermosExecutorSettings()  {
+  private static ExecutorConfig makeThermosExecutorConfig()  {
     List<Protos.Volume> volumeMounts =
         ImmutableList.<Protos.Volume>builder()
             .addAll(Iterables.transform(
@@ -145,34 +147,41 @@ public class ExecutorModule extends AbstractModule {
                     .build()))
             .build();
 
-    return new ExecutorSettings(
-        new ExecutorConfig(
-            ExecutorInfo.newBuilder()
-                .setName("aurora.task")
-                // Necessary as executorId is a required field.
-                .setExecutorId(Executors.PLACEHOLDER_EXECUTOR_ID)
-                .setCommand(
-                    makeExecutorCommand(
-                        THERMOS_EXECUTOR_PATH.get(),
-                        THERMOS_EXECUTOR_RESOURCES.get(),
-                        THERMOS_HOME_IN_SANDBOX.get(),
-                        THERMOS_EXECUTOR_FLAGS.get()))
-                .addResources(makeResource(CPUS, EXECUTOR_OVERHEAD_CPUS.get()))
-                .addResources(makeResource(RAM_MB, EXECUTOR_OVERHEAD_RAM.get().as(Data.MB)))
-                .build(),
-            volumeMounts),
-        POPULATE_DISCOVERY_INFO.get());
+    return new ExecutorConfig(
+        ExecutorInfo.newBuilder()
+            .setName(apiConstants.AURORA_EXECUTOR_NAME)
+            // Necessary as executorId is a required field.
+            .setExecutorId(Executors.PLACEHOLDER_EXECUTOR_ID)
+            .setCommand(
+                makeExecutorCommand(
+                    THERMOS_EXECUTOR_PATH.get(),
+                    THERMOS_EXECUTOR_RESOURCES.get(),
+                    THERMOS_HOME_IN_SANDBOX.get(),
+                    THERMOS_EXECUTOR_FLAGS.get()))
+            .addResources(makeResource(CPUS, EXECUTOR_OVERHEAD_CPUS.get()))
+            .addResources(makeResource(RAM_MB, EXECUTOR_OVERHEAD_RAM.get().as(Data.MB)))
+            .build(),
+        volumeMounts,
+        "thermos-");
   }
 
-  private static ExecutorSettings makeCustomExecutorSettings() {
+  private static ExecutorSettings makeExecutorSettings() {
     try {
-      return
-          new ExecutorSettings(
-              ExecutorSettingsLoader.read(
-                  Files.newBufferedReader(
-                      CUSTOM_EXECUTOR_CONFIG.get().toPath(),
-                      StandardCharsets.UTF_8)),
-              POPULATE_DISCOVERY_INFO.get());
+
+      ImmutableMap.Builder<String, ExecutorConfig> configsBuilder = ImmutableMap.builder();
+
+      configsBuilder.put(apiConstants.AURORA_EXECUTOR_NAME, makeThermosExecutorConfig());
+
+      if (CUSTOM_EXECUTOR_CONFIG.hasAppliedValue()) {
+        configsBuilder.putAll(
+            ExecutorSettingsLoader.read(
+                Files.newBufferedReader(
+                    CUSTOM_EXECUTOR_CONFIG.get().toPath(),
+                    StandardCharsets.UTF_8)));
+      }
+
+      return new ExecutorSettings(configsBuilder.build(), POPULATE_DISCOVERY_INFO.get());
+
     } catch (ExecutorSettingsLoader.ExecutorConfigException | IOException e) {
       throw new IllegalArgumentException("Failed to read executor settings: " + e, e);
     }
@@ -180,9 +189,7 @@ public class ExecutorModule extends AbstractModule {
 
   @Override
   protected void configure() {
-    bind(ExecutorSettings.class).toInstance(CUSTOM_EXECUTOR_CONFIG.hasAppliedValue()
-        ? makeCustomExecutorSettings()
-        : makeThermosExecutorSettings());
+    bind(ExecutorSettings.class).toInstance(makeExecutorSettings());
   }
 
   private static Resource makeResource(ResourceType type, double value) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
index e919d3f..5c987fd 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
@@ -13,7 +13,9 @@
  */
 package org.apache.aurora.scheduler.configuration.executor;
 
+import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
@@ -24,26 +26,32 @@ import static java.util.Objects.requireNonNull;
  * Configuration for the executor to run, and resource overhead required for it.
  */
 public class ExecutorSettings {
-  private final ExecutorConfig config;
+  private final Map<String, ExecutorConfig> config;
   private final boolean populateDiscoveryInfo;
 
-  public ExecutorSettings(ExecutorConfig config, boolean populateDiscoveryInfo) {
+  public ExecutorSettings(
+      Map<String, ExecutorConfig> config,
+      boolean populateDiscoveryInfo) {
+
     this.config = requireNonNull(config);
     this.populateDiscoveryInfo = populateDiscoveryInfo;
   }
 
-  public ExecutorConfig getExecutorConfig() {
-    // TODO(wfarner): Replace this with a generic name-based accessor once tasks can specify the
-    // executor they wish to use.
-    return config;
+  public Optional<ExecutorConfig> getExecutorConfig(String name) {
+    return Optional.ofNullable(config.get(name));
   }
 
   public boolean shouldPopulateDiscoverInfo() {
     return populateDiscoveryInfo;
   }
 
-  public ResourceBag getExecutorOverhead() {
-    return ResourceManager.bagFromMesosResources(config.getExecutor().getResourcesList());
+  public Optional<ResourceBag> getExecutorOverhead(String name) {
+    if (config.containsKey(name)) {
+      return Optional.of(
+          ResourceManager.bagFromMesosResources(config.get(name).getExecutor().getResourcesList()));
+    } else {
+      return Optional.empty();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoader.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoader.java
index b74edf4..7dc9eff 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoader.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoader.java
@@ -16,7 +16,9 @@ package org.apache.aurora.scheduler.configuration.executor;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -24,6 +26,7 @@ import com.google.common.io.CharStreams;
 import com.google.protobuf.UninitializedMessageException;
 import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
 
+import org.apache.aurora.GuavaUtils;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.ExecutorInfo;
 import org.apache.mesos.Protos.Volume;
@@ -55,10 +58,10 @@ public final class ExecutorSettingsLoader {
    * Reads an executor configuration from a JSON-encoded source.
    *
    * @param input The configuration data source.
-   * @return An executor configuration.
+   * @return A map of executor configurations.
    * @throws ExecutorConfigException If the input cannot be read or is not properly formatted.
    */
-  public static ExecutorConfig read(Readable input) throws ExecutorConfigException {
+  public static Map<String, ExecutorConfig> read(Readable input) throws ExecutorConfigException {
     String configContents;
     try {
       configContents = CharStreams.toString(input);
@@ -69,26 +72,31 @@ public final class ExecutorSettingsLoader {
     ObjectMapper mapper = new ObjectMapper()
         .registerModule(new ProtobufModule())
         .setPropertyNamingStrategy(CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
-    Schema parsed;
+    List<Schema> parsed;
     try {
-      parsed = mapper.readValue(configContents, Schema.class);
+      parsed = mapper.readValue(configContents, new TypeReference<List<Schema>>() { });
     } catch (IOException e) {
       throw new ExecutorConfigException(e);
     }
 
-    ExecutorInfo executorInfo;
+    Map<String, ExecutorConfig> customExecutors;
     try {
       // We apply a placeholder value for the executor ID so that we can construct and validate
       // the protobuf schema.  This allows us to catch many validation errors here rather than
       // later on when launching tasks.
-      executorInfo = parsed.executor.setExecutorId(PLACEHOLDER_EXECUTOR_ID).build();
+      customExecutors = parsed.stream().collect(
+          GuavaUtils.toImmutableMap(
+              m -> m.executor.getName(),
+              m -> new ExecutorConfig(
+                  m.executor.setExecutorId(PLACEHOLDER_EXECUTOR_ID).build(),
+                  Optional.fromNullable(m.volumeMounts).or(ImmutableList.of()),
+                  m.taskPrefix)));
+
     } catch (UninitializedMessageException e) {
       throw new ExecutorConfigException(e);
     }
 
-    return new ExecutorConfig(
-        executorInfo,
-        Optional.fromNullable(parsed.volumeMounts).or(ImmutableList.of()));
+    return customExecutors;
   }
 
   /**
@@ -98,6 +106,7 @@ public final class ExecutorSettingsLoader {
   private static class Schema {
     private ExecutorInfo.Builder executor;
     private List<Volume> volumeMounts;
+    private String taskPrefix;
 
     ExecutorInfo.Builder getExecutor() {
       return executor;
@@ -114,5 +123,13 @@ public final class ExecutorSettingsLoader {
     void setVolumeMounts(List<Volume> volumeMounts) {
       this.volumeMounts = volumeMounts;
     }
+
+    String getTaskPrefix() {
+      return taskPrefix;
+    }
+
+    void setTaskPrefix(String taskPrefix) {
+      this.taskPrefix = taskPrefix;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index 0b41dba..3413443 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -22,6 +22,7 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
@@ -35,6 +36,7 @@ import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.resources.AcceptedOffer;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IAppcImage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -84,7 +86,6 @@ public interface MesosTaskFactory {
   // TODO(wfarner): Move this class to its own file to reduce visibility to package private.
   class MesosTaskFactoryImpl implements MesosTaskFactory {
     private static final Logger LOG = LoggerFactory.getLogger(MesosTaskFactoryImpl.class);
-    private static final String EXECUTOR_PREFIX = "thermos-";
 
     @VisibleForTesting
     static final String METADATA_LABEL_PREFIX = "org.apache.aurora.metadata.";
@@ -111,8 +112,8 @@ public interface MesosTaskFactory {
     }
 
     @VisibleForTesting
-    static ExecutorID getExecutorId(String taskId) {
-      return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
+    static ExecutorID getExecutorId(String taskId, String taskPrefix) {
+      return ExecutorID.newBuilder().setValue(taskPrefix + taskId).build();
     }
 
     private static String getJobSourceName(IJobKey jobkey) {
@@ -123,6 +124,10 @@ public interface MesosTaskFactory {
       return getJobSourceName(task.getJob());
     }
 
+    private static String getExecutorName(IAssignedTask task) {
+      return task.getTask().getExecutorConfig().getName();
+    }
+
     @VisibleForTesting
     static String getInstanceSourceName(ITaskConfig task, int instanceId) {
       return String.format("%s.%s", getJobSourceName(task), instanceId);
@@ -148,13 +153,21 @@ public interface MesosTaskFactory {
       requireNonNull(offer);
 
       ITaskConfig config = task.getTask();
+
+      // Docker-based tasks don't need executors
+      ResourceBag executorOverhead = ResourceBag.EMPTY;
+      if (config.isSetExecutorConfig()) {
+        executorOverhead =
+            executorSettings.getExecutorOverhead(getExecutorName(task)).orElse(ResourceBag.EMPTY);
+      }
+
       AcceptedOffer acceptedOffer;
       // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
       try {
         acceptedOffer = AcceptedOffer.create(
             offer,
             task,
-            executorSettings.getExecutorOverhead(),
+            executorOverhead,
             tierManager.getTier(task.getTask()));
       } catch (ResourceManager.InsufficientResourcesException e) {
         throw new SchedulerException(e);
@@ -181,7 +194,8 @@ public interface MesosTaskFactory {
         ExecutorInfo.Builder executorInfoBuilder = configureTaskForExecutor(task, acceptedOffer);
 
         Optional<ContainerInfo.Builder> containerInfoBuilder = configureTaskForImage(
-            task.getTask().getContainer().getMesos());
+            task.getTask().getContainer().getMesos(),
+            getExecutorName(task));
         if (containerInfoBuilder.isPresent()) {
           executorInfoBuilder.setContainer(containerInfoBuilder.get());
         }
@@ -191,11 +205,13 @@ public interface MesosTaskFactory {
         IDockerContainer dockerContainer = config.getContainer().getDocker();
         if (config.isSetExecutorConfig()) {
           ExecutorInfo.Builder execBuilder = configureTaskForExecutor(task, acceptedOffer)
-              .setContainer(getDockerContainerInfo(dockerContainer));
+              .setContainer(getDockerContainerInfo(
+                  dockerContainer,
+                  Optional.of(getExecutorName(task))));
           taskBuilder.setExecutor(execBuilder.build());
         } else {
           LOG.warn("Running Docker-based task without an executor.");
-          taskBuilder.setContainer(getDockerContainerInfo(dockerContainer))
+          taskBuilder.setContainer(getDockerContainerInfo(dockerContainer, Optional.absent()))
               .setCommand(CommandInfo.newBuilder().setShell(false));
         }
       } else {
@@ -205,10 +221,13 @@ public interface MesosTaskFactory {
       if (taskBuilder.hasExecutor()) {
         taskBuilder.setData(ByteString.copyFrom(serializeTask(task)));
       }
+
       return taskBuilder.build();
     }
 
-    private Optional<ContainerInfo.Builder> configureTaskForImage(IMesosContainer mesosContainer) {
+    private Optional<ContainerInfo.Builder> configureTaskForImage(
+        IMesosContainer mesosContainer,
+        String executorName) {
       requireNonNull(mesosContainer);
 
       if (mesosContainer.isSetImage()) {
@@ -245,14 +264,17 @@ public interface MesosTaskFactory {
         return Optional.of(ContainerInfo.newBuilder()
             .setType(ContainerInfo.Type.MESOS)
             .setMesos(mesosContainerBuilder)
-            .addAllVolumes(executorSettings.getExecutorConfig().getVolumeMounts())
+            .addAllVolumes(executorSettings.getExecutorConfig(executorName).get().getVolumeMounts())
             .addVolumes(volume));
       }
 
       return Optional.absent();
     }
 
-    private ContainerInfo getDockerContainerInfo(IDockerContainer config) {
+    private ContainerInfo getDockerContainerInfo(
+        IDockerContainer config,
+        Optional<String> executorName) {
+
       Iterable<Protos.Parameter> parameters = Iterables.transform(config.getParameters(),
           item -> Protos.Parameter.newBuilder().setKey(item.getName())
             .setValue(item.getValue()).build());
@@ -262,7 +284,10 @@ public interface MesosTaskFactory {
       return ContainerInfo.newBuilder()
           .setType(ContainerInfo.Type.DOCKER)
           .setDocker(dockerBuilder.build())
-          .addAllVolumes(executorSettings.getExecutorConfig().getVolumeMounts())
+          .addAllVolumes(
+              executorName.isPresent()
+                  ? executorSettings.getExecutorConfig(executorName.get()).get().getVolumeMounts()
+                  : ImmutableList.of())
           .build();
     }
 
@@ -270,8 +295,13 @@ public interface MesosTaskFactory {
         IAssignedTask task,
         AcceptedOffer acceptedOffer) {
 
-      ExecutorInfo.Builder builder = executorSettings.getExecutorConfig().getExecutor().toBuilder()
-          .setExecutorId(getExecutorId(task.getTaskId()))
+      ExecutorInfo.Builder builder =
+          executorSettings.getExecutorConfig(getExecutorName(task)).get()
+          .getExecutor()
+          .toBuilder()
+          .setExecutorId(getExecutorId(
+              task.getTaskId(),
+              executorSettings.getExecutorConfig(getExecutorName(task)).get().getTaskPrefix()))
           .setLabels(
               Labels.newBuilder().addLabels(
                   Label.newBuilder()
@@ -325,5 +355,6 @@ public interface MesosTaskFactory {
         );
       }
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java
index 1dfa97e..fe54411 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java
@@ -14,7 +14,9 @@
 package org.apache.aurora.scheduler.mesos;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.configuration.executor.Executors;
@@ -35,7 +37,7 @@ public final class TestExecutorSettings {
   }
 
   public static final ExecutorInfo THERMOS_EXECUTOR_INFO = ExecutorInfo.newBuilder()
-      .setName("thermos")
+      .setName(apiConstants.AURORA_EXECUTOR_NAME)
       .setExecutorId(Executors.PLACEHOLDER_EXECUTOR_ID)
       .setCommand(CommandInfo.newBuilder().setValue("thermos_executor.pex")
           .addAllArguments(ImmutableList.of(
@@ -61,19 +63,30 @@ public final class TestExecutorSettings {
       ))
       .build();
 
+  public static final String THERMOS_TASK_PREFIX = "thermos-";
+
   public static final ExecutorConfig THERMOS_CONFIG =
-      new ExecutorConfig(THERMOS_EXECUTOR_INFO, ImmutableList.of());
+      new ExecutorConfig(THERMOS_EXECUTOR_INFO, ImmutableList.of(), THERMOS_TASK_PREFIX);
 
   public static final ExecutorSettings THERMOS_EXECUTOR = new ExecutorSettings(
-      THERMOS_CONFIG,
+      ImmutableMap.<String, ExecutorConfig>builder()
+          .put(apiConstants.AURORA_EXECUTOR_NAME, THERMOS_CONFIG)
+          .build(),
       false /* populate discovery info */);
 
   public static ExecutorSettings thermosOnlyWithOverhead(Iterable<Resource> resources) {
-    ExecutorConfig config = THERMOS_EXECUTOR.getExecutorConfig();
+    ExecutorConfig config =
+        THERMOS_EXECUTOR.getExecutorConfig(THERMOS_EXECUTOR_INFO.getName()).get();
+
     ExecutorInfo.Builder executor = config.getExecutor().toBuilder();
     executor.clearResources().addAllResources(resources);
     return new ExecutorSettings(
-        new ExecutorConfig(executor.build(), config.getVolumeMounts()),
+        ImmutableMap.<String, ExecutorConfig>builder().put(
+            apiConstants.AURORA_EXECUTOR_NAME,
+            new ExecutorConfig(
+                executor.build(),
+                config.getVolumeMounts(),
+                THERMOS_TASK_PREFIX)).build(),
         false /* populate discovery info */);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index ee6fe95..ba49e7a 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -113,12 +113,20 @@ public interface PreemptionVictimFilter {
           @Override
           public ResourceBag apply(PreemptionVictim victim) {
             ResourceBag bag = victim.getResourceBag();
+
+            if (victim.getConfig().isSetExecutorConfig()) {
+              // Be pessimistic about revocable resource available if config is not available
+              bag.add(executorSettings.getExecutorOverhead(
+                  victim.getConfig().getExecutorConfig().getName()).orElse(EMPTY));
+            }
+
             if (tierManager.getTier(victim.getConfig()).isRevocable()) {
               // Revocable task CPU cannot be used for preemption purposes as it's a compressible
               // resource. We can still use RAM, DISK and PORTS as they are not compressible.
               bag = bag.filter(IS_MESOS_REVOCABLE.negate());
             }
-            return bag.add(executorSettings.getExecutorOverhead());
+
+            return bag;
           }
         };
 
@@ -197,6 +205,11 @@ public interface PreemptionVictimFilter {
         return Optional.absent();
       }
 
+      ResourceBag overhead = pendingTask.isSetExecutorConfig()
+          ? executorSettings.getExecutorOverhead(
+              pendingTask.getExecutorConfig().getName()).orElse(EMPTY)
+          : EMPTY;
+
       ResourceBag totalResource = slackResources;
       for (PreemptionVictim victim : sortedVictims) {
         toPreemptTasks.add(victim);
@@ -205,8 +218,7 @@ public interface PreemptionVictimFilter {
             new UnusedResource(totalResource, attributes.get()),
             new ResourceRequest(
                 pendingTask,
-                ResourceManager.bagFromResources(pendingTask.getResources())
-                    .add(executorSettings.getExecutorOverhead()),
+                ResourceManager.bagFromResources(pendingTask.getResources()).add(overhead),
                 jobState));
 
         if (vetoes.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 3b84dbc..d266f6a 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -36,6 +36,7 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -140,12 +141,19 @@ public interface TaskScheduler extends EventSubscriber {
         ITaskConfig task = assignedTask.getTask();
         AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
 
+        // Valid Docker tasks can have a container but no executor config
+        ResourceBag overhead = ResourceBag.EMPTY;
+        if (task.isSetExecutorConfig()) {
+          overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
+              .orElseThrow(
+                  () -> new IllegalArgumentException("Cannot find executor configuration"));
+        }
+
         boolean launched = assigner.maybeAssign(
             store,
             new ResourceRequest(
                 task,
-                bagFromResources(task.getResources()).add(executorSettings.getExecutorOverhead()),
-                aggregate),
+                bagFromResources(task.getResources()).add(overhead), aggregate),
             TaskGroupKey.from(task),
             taskId,
             reservations.asMap());

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
index 1473270..db9f276 100644
--- a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java
@@ -33,10 +33,12 @@ import org.apache.aurora.gen.MesosFetcherURI;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
+import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
 import org.apache.aurora.scheduler.storage.entities.IDockerParameter;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.junit.Rule;
@@ -78,7 +80,7 @@ public class ConfigurationManagerTest {
               .setJob(JOB_KEY)
               .setIsService(false)
               .setTaskLinks(ImmutableMap.of())
-              .setExecutorConfig(new ExecutorConfig("aurora", "config"))
+              .setExecutorConfig(new ExecutorConfig(apiConstants.AURORA_EXECUTOR_NAME, "config"))
               .setRequestedPorts(ImmutableSet.of())
               .setPriority(0)
               .setOwner(null)
@@ -122,7 +124,8 @@ public class ConfigurationManagerTest {
           false,
           false),
       TaskTestUtil.TIER_MANAGER,
-      TaskTestUtil.THRIFT_BACKFILL);
+      TaskTestUtil.THRIFT_BACKFILL,
+      TestExecutorSettings.THERMOS_EXECUTOR);
   private static final ConfigurationManager DOCKER_CONFIGURATION_MANAGER = new ConfigurationManager(
       new ConfigurationManagerSettings(
           ALL_CONTAINER_TYPES,
@@ -132,7 +135,8 @@ public class ConfigurationManagerTest {
           true,
           true),
       TaskTestUtil.TIER_MANAGER,
-      TaskTestUtil.THRIFT_BACKFILL);
+      TaskTestUtil.THRIFT_BACKFILL,
+      TestExecutorSettings.THERMOS_EXECUTOR);
 
   @Test
   public void testIsGoodIdentifier() {
@@ -294,7 +298,8 @@ public class ConfigurationManagerTest {
             false,
             false),
         TaskTestUtil.TIER_MANAGER,
-        TaskTestUtil.THRIFT_BACKFILL).validateAndPopulate(ITaskConfig.build(builder));
+        TaskTestUtil.THRIFT_BACKFILL,
+        TestExecutorSettings.THERMOS_EXECUTOR).validateAndPopulate(ITaskConfig.build(builder));
   }
 
   @Test
@@ -315,7 +320,8 @@ public class ConfigurationManagerTest {
                     false,
                     false),
             TaskTestUtil.TIER_MANAGER,
-            TaskTestUtil.THRIFT_BACKFILL).validateAndPopulate(ITaskConfig.build(builder));
+            TaskTestUtil.THRIFT_BACKFILL,
+            TestExecutorSettings.THERMOS_EXECUTOR).validateAndPopulate(ITaskConfig.build(builder));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoaderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoaderTest.java b/src/test/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoaderTest.java
index 7de7c46..b8cfbaa 100644
--- a/src/test/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoaderTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettingsLoaderTest.java
@@ -15,10 +15,13 @@ package org.apache.aurora.scheduler.configuration.executor;
 
 import java.io.InputStreamReader;
 import java.io.StringReader;
+import java.util.Map;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettingsLoader.ExecutorConfigException;
 import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
 import org.apache.mesos.Protos.Volume;
@@ -28,34 +31,66 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 
 public class ExecutorSettingsLoaderTest {
-  private static final ExecutorConfig THERMOS_CONFIG = new ExecutorConfig(
-      TestExecutorSettings.THERMOS_CONFIG.getExecutor(),
-      ImmutableList.of(
-          Volume.newBuilder()
-              .setHostPath("/path/to/observer_root")
-              .setContainerPath("/path/to/observer_root")
-              .setMode(Mode.RO)
-              .build(),
-          Volume.newBuilder()
-              .setHostPath("/host")
-              .setContainerPath("/container")
-              .setMode(Mode.RW)
-              .build()));
+  private static final String EXECUTOR_NAME = apiConstants.AURORA_EXECUTOR_NAME;
+  private static final Map<String, ExecutorConfig> THERMOS_CONFIG_SINGLE =
+      ImmutableMap.<String, ExecutorConfig>builder().put(
+          EXECUTOR_NAME,
+          new ExecutorConfig(
+              TestExecutorSettings.THERMOS_CONFIG.getExecutor(),
+              ImmutableList.of(
+                  Volume.newBuilder()
+                      .setHostPath("/path/to/observer_root")
+                      .setContainerPath("/path/to/observer_root")
+                      .setMode(Mode.RO)
+                      .build(),
+                  Volume.newBuilder()
+                      .setHostPath("/host")
+                      .setContainerPath("/container")
+                      .setMode(Mode.RW)
+                      .build()),
+              TestExecutorSettings.THERMOS_TASK_PREFIX)).build();
 
-  private ExecutorConfig loadResource(String name) throws ExecutorConfigException {
+  private static final Map<String, ExecutorConfig> THERMOS_CONFIG_MULTI =
+      ImmutableMap.<String, ExecutorConfig>builder()
+          .putAll(THERMOS_CONFIG_SINGLE)
+          .put(EXECUTOR_NAME + "_2",
+              new ExecutorConfig(
+                  TestExecutorSettings.THERMOS_CONFIG
+                      .getExecutor()
+                      .toBuilder()
+                      .setName(EXECUTOR_NAME + "_2").build(),
+                  ImmutableList.of(
+                      Volume.newBuilder()
+                          .setHostPath("/path/to/observer_root2")
+                          .setContainerPath("/path/to/observer_root2")
+                          .setMode(Mode.RO)
+                          .build(),
+                      Volume.newBuilder()
+                          .setHostPath("/host2")
+                          .setContainerPath("/container2")
+                          .setMode(Mode.RW)
+                          .build()),
+                   TestExecutorSettings.THERMOS_TASK_PREFIX + "2-")).build();
+
+  private Map<String, ExecutorConfig> loadResource(String name) throws ExecutorConfigException {
     return ExecutorSettingsLoader.read(
         new InputStreamReader(getClass().getResourceAsStream(name), Charsets.UTF_8));
   }
 
-  private void assertParsedResult(ExecutorConfig expected, String file)
+  private void assertParsedResult(Map<String, ExecutorConfig> expected, String file)
       throws ExecutorConfigException {
 
     assertEquals(expected, loadResource(file));
   }
 
   @Test
-  public void testParse() throws Exception {
-    assertParsedResult(THERMOS_CONFIG, "test-thermos-executor.json");
+  public void testParseSingle() throws Exception {
+    assertParsedResult(THERMOS_CONFIG_SINGLE, "test-single-executor.json");
+  }
+
+  @Test
+  public void testParseMultiple() throws Exception {
+    assertParsedResult(THERMOS_CONFIG_MULTI, "test-multiple-executor.json");
   }
 
   @Test(expected = ExecutorConfigException.class)

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index d34e12f..0cf23df 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -31,6 +31,7 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
@@ -622,7 +623,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         .setRamMb(ramMb)
         .setDiskMb(diskMb)
         .setResources(ImmutableSet.of(numCpus(cpus), ramMb(ramMb), diskMb(diskMb)))
-        .setExecutorConfig(new ExecutorConfig("aurora", "config")));
+        .setExecutorConfig(new ExecutorConfig(apiConstants.AURORA_EXECUTOR_NAME, "config")));
   }
 
   private ITaskConfig makeTask(int cpus, long ramMb, long diskMb) {
@@ -635,6 +636,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
   private ResourceBag bag(ITaskConfig task) {
     return ResourceManager.bagFromResources(task.getResources())
-        .add(TaskExecutors.NO_OVERHEAD_EXECUTOR.getExecutorOverhead());
+        .add(TaskExecutors.NO_OVERHEAD_EXECUTOR.getExecutorOverhead(
+            task.getExecutorConfig().getName()).get());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index 0be5e49..7484e8b 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -106,14 +106,16 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
                       ImmutableList.of(new DockerParameter("label", "testparameter")))))));
 
   private static final ExecutorSettings EXECUTOR_SETTINGS_WITH_VOLUMES = new ExecutorSettings(
-      new ExecutorConfig(
-          TestExecutorSettings.THERMOS_EXECUTOR_INFO,
-          ImmutableList.of(
-              Volume.newBuilder()
-                  .setHostPath("/host")
-                  .setContainerPath("/container")
-                  .setMode(Mode.RO)
-                  .build())),
+      ImmutableMap.<String, ExecutorConfig>builder().
+          put(TestExecutorSettings.THERMOS_EXECUTOR_INFO.getName(),
+              new ExecutorConfig(
+                  TestExecutorSettings.THERMOS_EXECUTOR_INFO,
+                  ImmutableList.of(
+                      Volume.newBuilder()
+                          .setHostPath("/host")
+                          .setContainerPath("/container")
+                          .setMode(Mode.RO).build()),
+                  TestExecutorSettings.THERMOS_TASK_PREFIX)).build(),
       false /* populate discovery info */);
 
   private static final SlaveID SLAVE = SlaveID.newBuilder().setValue("slave-id").build();
@@ -123,13 +125,15 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
       .setSlaveId(SLAVE)
       .setHostname("slave-hostname")
       .addAllResources(mesosScalarFromBag(bagFromResources(
-              TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead())))
+              TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(
+                  TASK_CONFIG.getExecutorConfig().getName()).get())))
       .addResources(mesosRange(PORTS, 80))
       .build();
   private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder()
       .clearResources()
       .addAllResources(mesosScalarFromBag(bagFromResources(
-          TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead())))
+          TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(
+              TASK_CONFIG.getExecutorConfig().getName()).get())))
       .addResources(mesosRange(PORTS, 80))
       .build();
 
@@ -152,7 +156,9 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
   private static ExecutorInfo populateDynamicFields(ExecutorInfo executor, IAssignedTask task) {
     return executor.toBuilder()
         .clearResources()
-        .setExecutorId(MesosTaskFactoryImpl.getExecutorId(task.getTaskId()))
+        .setExecutorId(MesosTaskFactoryImpl.getExecutorId(
+            task.getTaskId(),
+            THERMOS_EXECUTOR.getExecutorConfig(executor.getName()).get().getTaskPrefix()))
         .setLabels(
             Protos.Labels.newBuilder().addLabels(
                 Protos.Label.newBuilder()
@@ -264,7 +270,11 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
     TaskInfo task = taskFactory.createFrom(TASK, OFFER_THERMOS_EXECUTOR);
     assertEquals(
         purgeZeroResources(populateDynamicFields(
-            NO_OVERHEAD_EXECUTOR.getExecutorConfig().getExecutor(), TASK)),
+            NO_OVERHEAD_EXECUTOR.getExecutorConfig(TASK.getTask()
+                .getExecutorConfig()
+                .getName()).get()
+                .getExecutor(),
+            TASK)),
         makeComparable(task.getExecutor()));
 
     // Simulate the upsizing needed for the task to meet the minimum thermos requirements.
@@ -279,7 +289,8 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
         bagFromMesosResources(taskInfo.getExecutor().getResourcesList());
 
     assertEquals(
-        bagFromResources(task.getResources()).add(config.getExecutorOverhead()),
+        bagFromResources(task.getResources()).add(
+            config.getExecutorOverhead(task.getExecutorConfig().getName()).get()),
         taskResources.add(executorResources));
   }
 
@@ -350,7 +361,8 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
     TaskInfo taskInfo = taskFactory.createFrom(TASK_WITH_DOCKER, OFFER_THERMOS_EXECUTOR);
     assertEquals(
-        config.getExecutorConfig().getVolumeMounts(),
+        config.getExecutorConfig(TASK_WITH_DOCKER.getTask().getExecutorConfig().getName()).get()
+            .getVolumeMounts(),
         taskInfo.getExecutor().getContainer().getVolumesList());
   }
 
@@ -397,7 +409,10 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
   @Test
   public void testPopulateDiscoveryInfoNoPort() {
-    config = new ExecutorSettings(THERMOS_CONFIG, true /* populate discovery info */);
+    config = new ExecutorSettings(
+        ImmutableMap.<String, ExecutorConfig>builder().put(THERMOS_CONFIG.getExecutor().getName(),
+            THERMOS_CONFIG).build(),
+        true /* populate discovery info */);
     AssignedTask builder = TASK.newBuilder();
     builder.unsetAssignedPorts();
     builder.setTask(
@@ -415,7 +430,10 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
 
   @Test
   public void testPopulateDiscoveryInfo() {
-    config = new ExecutorSettings(THERMOS_CONFIG, true /* populate discovery info */);
+    config = new ExecutorSettings(
+        ImmutableMap.<String, ExecutorConfig>builder().put(THERMOS_CONFIG.getExecutor().getName(),
+            THERMOS_CONFIG).build(),
+        true /* populate discovery info */);
     expect(tierManager.getTier(TASK_CONFIG)).andReturn(DEV_TIER);
     taskFactory = new MesosTaskFactoryImpl(config, tierManager, SERVER_INFO);
 
@@ -443,13 +461,13 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
         EXECUTOR_SETTINGS_WITH_VOLUMES,
         tierManager,
         SERVER_INFO);
-
     TaskInfo task = taskFactory.createFrom(taskWithDockerImage, OFFER_THERMOS_EXECUTOR);
     assertEquals(
         ContainerInfo.newBuilder()
             .setType(Type.MESOS)
             .setMesos(MesosInfo.newBuilder())
-            .addAllVolumes(EXECUTOR_SETTINGS_WITH_VOLUMES.getExecutorConfig().getVolumeMounts())
+            .addAllVolumes(EXECUTOR_SETTINGS_WITH_VOLUMES.getExecutorConfig(
+                TASK.getTask().getExecutorConfig().getName()).get().getVolumeMounts())
             .addVolumes(Volume.newBuilder()
                 .setContainerPath(TASK_FILESYSTEM_MOUNT_POINT)
                 .setImage(Protos.Image.newBuilder()
@@ -485,7 +503,8 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
         ContainerInfo.newBuilder()
             .setType(Type.MESOS)
             .setMesos(MesosInfo.newBuilder())
-            .addAllVolumes(EXECUTOR_SETTINGS_WITH_VOLUMES.getExecutorConfig().getVolumeMounts())
+            .addAllVolumes(EXECUTOR_SETTINGS_WITH_VOLUMES.getExecutorConfig(
+                TASK.getTask().getExecutorConfig().getName()).get().getVolumeMounts())
             .addVolumes(Volume.newBuilder()
                 .setContainerPath(TASK_FILESYSTEM_MOUNT_POINT)
                 .setImage(Protos.Image.newBuilder()

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index 7eb1714..ee5c652 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -28,12 +28,14 @@ import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TierInfo;
 import org.apache.aurora.scheduler.TierManager;
@@ -639,7 +641,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest {
             .setJob(new JobKey(role, env, job))
             .setPriority(priority)
             .setProduction(production)
-            .setConstraints(Sets.newHashSet()));
+            .setConstraints(Sets.newHashSet())
+            .setExecutorConfig(new ExecutorConfig(apiConstants.AURORA_EXECUTOR_NAME, "config")));
     return new ScheduledTask().setAssignedTask(assignedTask);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index fba427b..72562e6 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -128,7 +128,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private ResourceBag bag(IScheduledTask task) {
     return ResourceManager.bagFromResources(task.getAssignedTask().getTask().getResources())
-        .add(THERMOS_EXECUTOR.getExecutorOverhead());
+        .add(THERMOS_EXECUTOR.getExecutorOverhead(task.getAssignedTask()
+            .getTask()
+            .getExecutorConfig()
+            .getName()).get());
   }
 
   private IExpectationSetters<Boolean> expectAssigned(

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java b/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
index a79b0f4..95b3716 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/Fixtures.java
@@ -43,6 +43,7 @@ import org.apache.aurora.gen.ResponseDetail;
 import org.apache.aurora.gen.Result;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
@@ -102,7 +103,7 @@ final class Fixtures {
         .setJob(JOB_KEY.newBuilder())
         .setOwner(IDENTITY)
         .setContactEmail("testing@twitter.com")
-        .setExecutorConfig(new ExecutorConfig("aurora", "data"))
+        .setExecutorConfig(new ExecutorConfig(apiConstants.AURORA_EXECUTOR_NAME, "data"))
         .setNumCpus(1)
         .setRamMb(1024)
         .setDiskMb(1024)

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index c0fe843..779dc30 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -70,10 +70,12 @@ import org.apache.aurora.gen.StartJobUpdateResult;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
 import org.apache.aurora.scheduler.cron.CronException;
@@ -166,6 +168,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final String AUDIT_MESSAGE = "message";
   private static final AuditData AUDIT = new AuditData(USER, Optional.of(AUDIT_MESSAGE));
   private static final Thresholds THRESHOLDS = new Thresholds(1000, 2000);
+  private static final String EXECUTOR_NAME = apiConstants.AURORA_EXECUTOR_NAME;
 
   private StorageTestUtil storageUtil;
   private LockManager lockManager;
@@ -413,7 +416,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testCreateJobFailsNoExecutorConfig() throws Exception {
+  public void testCreateJobFailsNoExecutorOrContainerConfig() throws Exception {
     JobConfiguration job = makeJob();
     job.getTaskConfig().unsetExecutorConfig();
 
@@ -421,8 +424,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     Response response = thrift.createJob(job);
     assertResponse(INVALID_REQUEST, response);
-    // TODO(wfarner): Don't rely on a magic string here, reference a constant from the source.
-    assertMessageMatches(response, "Configuration may not be null");
+    assertMessageMatches(response, ConfigurationManager.NO_EXECUTOR_OR_CONTAINER);
   }
 
   @Test
@@ -484,7 +486,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testCreateJobPopulateDefaults() throws Exception {
     TaskConfig task = new TaskConfig()
         .setContactEmail("testing@twitter.com")
-        .setExecutorConfig(new ExecutorConfig("aurora", "config"))  // Arbitrary opaque data.
+        .setExecutorConfig(
+            new ExecutorConfig(EXECUTOR_NAME, "config")) // Arbitrary opaque data.
         .setNumCpus(1.0)
         .setRamMb(1024)
         .setDiskMb(1024)
@@ -567,7 +570,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
             .setInstanceId(instanceId)
             .setTask(populatedTask()
                 .setIsService(true)
-                .setExecutorConfig(new ExecutorConfig().setData(executorData)))));
+                .setExecutorConfig(new ExecutorConfig().setName(EXECUTOR_NAME)
+                    .setData(executorData)))));
   }
 
   private IScheduledTask buildScheduledTask() {
@@ -1040,7 +1044,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testRewriteShardCasMismatch() throws Exception {
     TaskConfig storedConfig = productionTask();
     TaskConfig modifiedConfig =
-        storedConfig.deepCopy().setExecutorConfig(new ExecutorConfig("aurora", "rewritten"));
+        storedConfig.deepCopy().setExecutorConfig(new ExecutorConfig(EXECUTOR_NAME, "rewritten"));
     IScheduledTask storedTask = IScheduledTask.build(
         new ScheduledTask().setAssignedTask(new AssignedTask().setTask(storedConfig)));
     InstanceKey instance = new InstanceKey(storedConfig.getJob(), 0);
@@ -1060,7 +1064,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testRewriteInstance() throws Exception {
     TaskConfig storedConfig = productionTask();
     ITaskConfig modifiedConfig = ITaskConfig.build(
-        storedConfig.deepCopy().setExecutorConfig(new ExecutorConfig("aurora", "rewritten")));
+        storedConfig.deepCopy().setExecutorConfig(new ExecutorConfig(EXECUTOR_NAME, "rewritten")));
     String taskId = "task_id";
     IScheduledTask storedTask = IScheduledTask.build(new ScheduledTask().setAssignedTask(
         new AssignedTask()
@@ -1109,7 +1113,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testRewriteJobCasMismatch() throws Exception {
     JobConfiguration oldJob = makeJob(productionTask());
     JobConfiguration newJob = oldJob.deepCopy();
-    newJob.getTaskConfig().setExecutorConfig(new ExecutorConfig("aurora", "rewritten"));
+    newJob.getTaskConfig().setExecutorConfig(new ExecutorConfig(EXECUTOR_NAME, "rewritten"));
     expect(storageUtil.jobStore.fetchJob(IJobKey.build(oldJob.getKey())))
         .andReturn(Optional.of(IJobConfiguration.build(oldJob)));
 
@@ -1125,7 +1129,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testRewriteJobNotFound() throws Exception {
     JobConfiguration oldJob = makeJob(productionTask());
     JobConfiguration newJob = oldJob.deepCopy();
-    newJob.getTaskConfig().setExecutorConfig(new ExecutorConfig("aurora", "rewritten"));
+    newJob.getTaskConfig().setExecutorConfig(new ExecutorConfig(EXECUTOR_NAME, "rewritten"));
     expect(storageUtil.jobStore.fetchJob(IJobKey.build(oldJob.getKey())))
         .andReturn(Optional.absent());
 
@@ -1141,7 +1145,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testRewriteJob() throws Exception {
     JobConfiguration oldJob = makeJob(productionTask());
     JobConfiguration newJob = oldJob.deepCopy();
-    newJob.getTaskConfig().setExecutorConfig(new ExecutorConfig("aurora", "rewritten"));
+    newJob.getTaskConfig().setExecutorConfig(new ExecutorConfig(EXECUTOR_NAME, "rewritten"));
     expect(storageUtil.jobStore.fetchJob(IJobKey.build(oldJob.getKey())))
         .andReturn(Optional.of(IJobConfiguration.build(oldJob)));
     storageUtil.jobStore.saveAcceptedJob(validateAndPopulate(IJobConfiguration.build(newJob)));

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-missing-field.json
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-missing-field.json b/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-missing-field.json
index 4646170..e914b95 100644
--- a/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-missing-field.json
+++ b/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-missing-field.json
@@ -1,4 +1,6 @@
-{
-  "executor": {
+[
+  {
+    "executor": {
+    }
   }
-}
+]

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-multiple-executor.json
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-multiple-executor.json b/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-multiple-executor.json
new file mode 100644
index 0000000..36b9a5d
--- /dev/null
+++ b/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-multiple-executor.json
@@ -0,0 +1,100 @@
+[
+  {
+    "executor": {
+      "name": "AuroraExecutor",
+      "command": {
+        "value": "thermos_executor.pex",
+        "arguments": [
+          "--announcer-ensemble",
+          "localhost:2181"
+        ],
+        "uris": [
+          {
+            "value": "/home/vagrant/aurora/dist/thermos_executor.pex",
+            "executable": true,
+            "extract": false,
+            "cache": false
+          }
+        ]
+      },
+      "resources": [
+        {
+          "name": "cpus",
+          "type": "SCALAR",
+          "scalar": {
+            "value": 0.25
+          }
+        },
+        {
+          "name": "mem",
+          "type": "SCALAR",
+          "scalar": {
+            "value": 128
+          }
+        }
+      ]
+    },
+    "volume_mounts": [
+      {
+        "mode": "RO",
+        "container_path": "/path/to/observer_root",
+        "host_path": "/path/to/observer_root"
+      },
+      {
+        "mode": "RW",
+        "container_path": "/container",
+        "host_path": "/host"
+      }
+    ],
+    "task_prefix" : "thermos-"
+  },
+  {
+    "executor": {
+      "name": "AuroraExecutor_2",
+      "command": {
+        "value": "thermos_executor.pex",
+        "arguments": [
+          "--announcer-ensemble",
+          "localhost:2181"
+        ],
+        "uris": [
+          {
+            "value": "/home/vagrant/aurora/dist/thermos_executor.pex",
+            "executable": true,
+            "extract": false,
+            "cache": false
+          }
+        ]
+      },
+      "resources": [
+        {
+          "name": "cpus",
+          "type": "SCALAR",
+          "scalar": {
+            "value": 0.25
+          }
+        },
+        {
+          "name": "mem",
+          "type": "SCALAR",
+          "scalar": {
+            "value": 128
+          }
+        }
+      ]
+    },
+    "volume_mounts": [
+      {
+        "mode": "RO",
+        "container_path": "/path/to/observer_root2",
+        "host_path": "/path/to/observer_root2"
+      },
+      {
+        "mode": "RW",
+        "container_path": "/container2",
+        "host_path": "/host2"
+      }
+    ],
+    "task_prefix" : "thermos-2-"
+  }
+]

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-single-executor.json
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-single-executor.json b/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-single-executor.json
new file mode 100644
index 0000000..d94e194
--- /dev/null
+++ b/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-single-executor.json
@@ -0,0 +1,51 @@
+[
+  {
+    "executor": {
+      "name": "AuroraExecutor",
+      "command": {
+        "value": "thermos_executor.pex",
+        "arguments": [
+          "--announcer-ensemble",
+          "localhost:2181"
+        ],
+        "uris": [
+          {
+            "value": "/home/vagrant/aurora/dist/thermos_executor.pex",
+            "executable": true,
+            "extract": false,
+            "cache": false
+          }
+        ]
+      },
+      "resources": [
+        {
+          "name": "cpus",
+          "type": "SCALAR",
+          "scalar": {
+            "value": 0.25
+          }
+        },
+        {
+          "name": "mem",
+          "type": "SCALAR",
+          "scalar": {
+            "value": 128
+          }
+        }
+      ]
+    },
+    "volume_mounts": [
+      {
+        "mode": "RO",
+        "container_path": "/path/to/observer_root",
+        "host_path": "/path/to/observer_root"
+      },
+      {
+        "mode": "RW",
+        "container_path": "/container",
+        "host_path": "/host"
+      }
+    ],
+    "task_prefix" : "thermos-"
+  }
+]

http://git-wip-us.apache.org/repos/asf/aurora/blob/d0533d2c/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-thermos-executor.json
----------------------------------------------------------------------
diff --git a/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-thermos-executor.json b/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-thermos-executor.json
deleted file mode 100644
index 114eb4f..0000000
--- a/src/test/resources/org/apache/aurora/scheduler/configuration/executor/test-thermos-executor.json
+++ /dev/null
@@ -1,48 +0,0 @@
-{
-  "executor": {
-    "name": "thermos",
-    "command": {
-      "value": "thermos_executor.pex",
-      "arguments": [
-        "--announcer-ensemble",
-        "localhost:2181"
-      ],
-      "uris": [
-        {
-          "value": "/home/vagrant/aurora/dist/thermos_executor.pex",
-          "executable": true,
-          "extract": false,
-          "cache": false
-        }
-      ]
-    },
-    "resources": [
-      {
-        "name": "cpus",
-        "type": "SCALAR",
-        "scalar": {
-          "value": 0.25
-        }
-      },
-      {
-        "name": "mem",
-        "type": "SCALAR",
-        "scalar": {
-          "value": 128
-        }
-      }
-    ]
-  },
-  "volume_mounts": [
-    {
-      "mode": "RO",
-      "container_path": "/path/to/observer_root",
-      "host_path": "/path/to/observer_root"
-    },
-    {
-      "mode": "RW",
-      "container_path": "/container",
-      "host_path": "/host"
-    }
-  ]
-}