You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/11/22 02:00:54 UTC
incubator-aurora git commit: Prevent Aurora from creating zero sized
Executor tasks.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 5116c2209 -> a431b1d6b
Prevent Aurora from creating zero sized Executor tasks.
Bugs closed: AURORA-928
Reviewed at https://reviews.apache.org/r/28193/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/a431b1d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/a431b1d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/a431b1d6
Branch: refs/heads/master
Commit: a431b1d6b98c514448346a9c6af0c90951db9b4d
Parents: 5116c22
Author: Zameer Manji <zm...@twopensource.com>
Authored: Fri Nov 21 17:00:37 2014 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Nov 21 17:00:37 2014 -0800
----------------------------------------------------------------------
.../apache/aurora/scheduler/ResourceSlot.java | 35 +++----
.../aurora/scheduler/app/SchedulerMain.java | 16 ++-
.../scheduler/mesos/MesosTaskFactory.java | 94 ++++++++++++++---
.../aurora/scheduler/app/SchedulerIT.java | 14 ++-
.../mesos/MesosTaskFactoryImplTest.java | 104 ++++++++++++++++---
5 files changed, 210 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
index 46832d5..0b15834 100644
--- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
@@ -38,34 +38,32 @@ public final class ResourceSlot {
private final Resources resources;
/**
- * CPU allocated for each executor.
+ * Extra CPU allocated for each executor.
*/
@VisibleForTesting
@CmdLine(name = "thermos_executor_cpu",
help = "The number of CPU cores to allocate for each instance of the executor.")
- public static final Arg<Double> EXECUTOR_CPUS = Arg.create(0.25);
+ public static final Arg<Double> EXECUTOR_OVERHEAD_CPUS = Arg.create(0.25);
/**
- * RAM required for the executor. Executors in the wild have been observed using 48-54MB RSS,
- * setting to 128MB to be extra vigilant initially.
+ * Extra RAM allocated for the executor.
*/
@VisibleForTesting
@CmdLine(name = "thermos_executor_ram",
help = "The amount of RAM to allocate for each instance of the executor.")
- public static final Arg<Amount<Integer, Data>> EXECUTOR_RAM =
- Arg.create(Amount.of(128, Data.MB));
+ public static final Arg<Amount<Long, Data>> EXECUTOR_OVERHEAD_RAM =
+ Arg.create(Amount.of(128L, Data.MB));
private ResourceSlot(Resources r) {
this.resources = r;
}
public static ResourceSlot from(ITaskConfig task) {
- double totalCPU = task.getNumCpus() + EXECUTOR_CPUS.get();
- Amount<Long, Data> totalRAM =
- Amount.of(task.getRamMb() + EXECUTOR_RAM.get().as(Data.MB), Data.MB);
- Amount<Long, Data> disk = Amount.of(task.getDiskMb(), Data.MB);
- return new ResourceSlot(
- new Resources(totalCPU, totalRAM, disk, task.getRequestedPorts().size()));
+ return from(
+ task.getNumCpus(),
+ Amount.of(task.getRamMb(), Data.MB),
+ Amount.of(task.getDiskMb(), Data.MB),
+ task.getRequestedPorts().size());
}
public static ResourceSlot from(Offer offer) {
@@ -89,13 +87,14 @@ public final class ResourceSlot {
}
@VisibleForTesting
- public static ResourceSlot from(double cpu,
- Amount<Long, Data> ram,
- Amount<Long, Data> disk,
- int ports) {
- double totalCPU = cpu + EXECUTOR_CPUS.get();
+ public static ResourceSlot from(
+ double cpu,
+ Amount<Long, Data> ram,
+ Amount<Long, Data> disk,
+ int ports) {
+ double totalCPU = cpu + EXECUTOR_OVERHEAD_CPUS.get();
Amount<Long, Data> totalRAM =
- Amount.of(ram.as(Data.MB) + EXECUTOR_RAM.get().as(Data.MB), Data.MB);
+ Amount.of(ram.as(Data.MB) + EXECUTOR_OVERHEAD_RAM.get().as(Data.MB), Data.MB);
return new ResourceSlot(new Resources(totalCPU, totalRAM, disk, ports));
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 288e7cb..72c7545 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -35,6 +35,8 @@ import com.twitter.common.args.constraints.NotEmpty;
import com.twitter.common.args.constraints.NotNull;
import com.twitter.common.inject.Bindings;
import com.twitter.common.logging.RootLogConfig;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
import com.twitter.common.zookeeper.Group;
import com.twitter.common.zookeeper.SingletonService;
import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
@@ -46,11 +48,12 @@ import org.apache.aurora.auth.CapabilityValidator;
import org.apache.aurora.auth.SessionValidator;
import org.apache.aurora.auth.UnsecureAuthModule;
import org.apache.aurora.scheduler.SchedulerLifecycle;
+import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.cron.quartz.CronModule;
import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorSettings;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
import org.apache.aurora.scheduler.storage.db.DbModule;
import org.apache.aurora.scheduler.storage.db.MigrationModule;
@@ -64,6 +67,9 @@ import org.apache.aurora.scheduler.thrift.auth.ThriftAuthModule;
import static com.twitter.common.logging.RootLogConfig.Configuration;
+import static org.apache.aurora.scheduler.ResourceSlot.EXECUTOR_OVERHEAD_CPUS;
+import static org.apache.aurora.scheduler.ResourceSlot.EXECUTOR_OVERHEAD_RAM;
+
/**
* Launcher for the aurora scheduler.
*/
@@ -178,7 +184,13 @@ public class SchedulerMain extends AbstractApplication {
.add(new AbstractModule() {
@Override
protected void configure() {
- bind(ExecutorConfig.class).toInstance(new ExecutorConfig(THERMOS_EXECUTOR_PATH.get()));
+ Resources executorOverhead = new Resources(
+ EXECUTOR_OVERHEAD_CPUS.get(),
+ EXECUTOR_OVERHEAD_RAM.get(),
+ Amount.of(0L, Data.MB),
+ 0);
+ bind(ExecutorSettings.class)
+ .toInstance(new ExecutorSettings(THERMOS_EXECUTOR_PATH.get(), executorOverhead));
}
})
.add(getMesosModules())
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index bb227fd..e0332c0 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -23,11 +23,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
+import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
import org.apache.aurora.Protobufs;
import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.CommandUtil;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.SchedulerException;
@@ -62,16 +62,22 @@ public interface MesosTaskFactory {
*/
TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
- class ExecutorConfig {
+ class ExecutorSettings {
private final String executorPath;
+ private final Resources executorOverhead;
- public ExecutorConfig(String executorPath) {
+ public ExecutorSettings(String executorPath, Resources executorOverhead) {
this.executorPath = checkNotBlank(executorPath);
+ this.executorOverhead = requireNonNull(executorOverhead);
}
String getExecutorPath() {
return executorPath;
}
+
+ Resources getExecutorOverhead() {
+ return executorOverhead;
+ }
}
// TODO(wfarner): Move this class to its own file to reduce visibility to package private.
@@ -80,16 +86,39 @@ public interface MesosTaskFactory {
private static final String EXECUTOR_PREFIX = "thermos-";
/**
+ * Minimum resources required to run Thermos. In the wild Thermos needs about 0.01 CPU and
+ * about 100MB of RAM. The RAM requirement has been rounded up to a power of 2.
+ */
+ @VisibleForTesting
+ static final Resources MIN_THERMOS_RESOURCES = new Resources(
+ 0.01,
+ Amount.of(128L, Data.MB),
+ Amount.of(0L, Data.MB),
+ 0);
+
+ /**
+ * Minimum resources to allocate for a task. Mesos rejects tasks that have no CPU or no RAM.
+ */
+ @VisibleForTesting
+ static final Resources MIN_TASK_RESOURCES = new Resources(
+ 0.01,
+ Amount.of(1L, Data.MB),
+ Amount.of(0L, Data.MB),
+ 0);
+
+ /**
* Name to associate with task executors.
*/
@VisibleForTesting
static final String EXECUTOR_NAME = "aurora.task";
private final String executorPath;
+ private final Resources executorOverhead;
@Inject
- MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
- this.executorPath = executorConfig.getExecutorPath();
+ MesosTaskFactoryImpl(ExecutorSettings executorSettings) {
+ this.executorPath = executorSettings.getExecutorPath();
+ this.executorOverhead = executorSettings.getExecutorOverhead();
}
@VisibleForTesting
@@ -97,21 +126,46 @@ public interface MesosTaskFactory {
return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
}
- public static String getJobSourceName(IJobKey jobkey) {
+ private static String getJobSourceName(IJobKey jobkey) {
return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
}
- public static String getJobSourceName(ITaskConfig task) {
+ private static String getJobSourceName(ITaskConfig task) {
return getJobSourceName(task.getJob());
}
- public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
+ @VisibleForTesting
+ static String getInstanceSourceName(ITaskConfig task, int instanceId) {
return String.format("%s.%s", getJobSourceName(task), instanceId);
}
+ /**
+ * Generates a Resource where each resource component is a max out of the two components.
+ *
+ * @param a A resource to compare.
+ * @param b A resource to compare.
+ *
+ * @return Returns a Resources instance where each component is a max of the two components.
+ */
+ @VisibleForTesting
+ static Resources maxElements(Resources a, Resources b) {
+ double maxCPU = Math.max(a.getNumCpus(), b.getNumCpus());
+ Amount<Long, Data> maxRAM = Amount.of(
+ Math.max(a.getRam().as(Data.MB), b.getRam().as(Data.MB)),
+ Data.MB);
+ Amount<Long, Data> maxDisk = Amount.of(
+ Math.max(a.getDisk().as(Data.MB), b.getDisk().as(Data.MB)),
+ Data.MB);
+ int maxPorts = Math.max(a.getNumPorts(), b.getNumPorts());
+
+ return new Resources(maxCPU, maxRAM, maxDisk, maxPorts);
+ }
+
@Override
public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
requireNonNull(task);
+ requireNonNull(slaveId);
+
byte[] taskInBytes;
try {
taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
@@ -120,9 +174,25 @@ public interface MesosTaskFactory {
throw new SchedulerException("Internal error.", e);
}
+ // The objective of the below code is to allocate a task and executor that is in a container
+ // of task + executor overhead size. Mesos stipulates that we cannot allocate 0 sized tasks or
+ // executors and we should always ensure the ExecutorInfo has enough resources to launch or
+ // run an executor. Therefore the total desired container size (task + executor overhead) is
+ // partitioned to a small portion that is always allocated to the executor and the rest to the
+ // task. If the remaining resources are not enough for the task a small epsilon is allocated
+ // to the task.
+
ITaskConfig config = task.getTask();
+ Resources taskResources = Resources.from(config);
+ Resources containerResources = Resources.sum(taskResources, executorOverhead);
+
+ taskResources = Resources.subtract(containerResources, MIN_THERMOS_RESOURCES);
+ // It is possible that the final task resources will be negative.
+ // This ensures the task resources are positive.
+ Resources finalTaskResources = maxElements(taskResources, MIN_TASK_RESOURCES);
+
// TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
- List<Resource> resources = Resources.from(config)
+ List<Resource> resources = finalTaskResources
.toResourceList(task.isSetAssignedPorts()
? ImmutableSet.copyOf(task.getAssignedPorts().values())
: ImmutableSet.<Integer>of());
@@ -144,11 +214,7 @@ public interface MesosTaskFactory {
.setExecutorId(getExecutorId(task.getTaskId()))
.setName(EXECUTOR_NAME)
.setSource(getInstanceSourceName(config, task.getInstanceId()))
- .addResources(
- Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
- .addResources(Resources.makeMesosResource(
- Resources.RAM_MB,
- ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
+ .addAllResources(MIN_THERMOS_RESOURCES.toResourceList())
.build();
return taskBuilder
.setExecutor(executor)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index bf6cfad..5e54364 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -49,6 +49,7 @@ import com.twitter.common.application.modules.LifecycleModule;
import com.twitter.common.base.ExceptionalCommand;
import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.Stats;
import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
@@ -76,13 +77,14 @@ import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.gen.storage.storageConstants;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.log.Log.Entry;
import org.apache.aurora.scheduler.log.Log.Position;
import org.apache.aurora.scheduler.log.Log.Stream;
import org.apache.aurora.scheduler.mesos.DriverFactory;
import org.apache.aurora.scheduler.mesos.DriverSettings;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorSettings;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
import org.apache.aurora.scheduler.storage.log.EntrySerializer;
import org.apache.aurora.scheduler.storage.log.LogStorageModule;
@@ -104,6 +106,8 @@ import org.junit.Test;
import static com.twitter.common.testing.easymock.EasyMockTest.createCapture;
+import static org.apache.aurora.scheduler.ResourceSlot.EXECUTOR_OVERHEAD_CPUS;
+import static org.apache.aurora.scheduler.ResourceSlot.EXECUTOR_OVERHEAD_RAM;
import static org.apache.mesos.Protos.FrameworkInfo;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createControl;
@@ -194,7 +198,13 @@ public class SchedulerIT extends BaseZooKeeperTest {
bind(DriverFactory.class).toInstance(driverFactory);
bind(DriverSettings.class).toInstance(SETTINGS);
bind(Log.class).toInstance(log);
- bind(ExecutorConfig.class).toInstance(new ExecutorConfig("/executor/thermos"));
+ Resources executorOverhead = new Resources(
+ EXECUTOR_OVERHEAD_CPUS.get(),
+ EXECUTOR_OVERHEAD_RAM.get(),
+ Amount.of(0L, Data.MB),
+ 0);
+ bind(ExecutorSettings.class)
+ .toInstance(new ExecutorSettings("/executor/thermos", executorOverhead));
install(new BackupModule(backupDir, SnapshotStoreImpl.class));
}
};
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a431b1d6/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index 953c1ed..22fb991 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -15,15 +15,15 @@ package org.apache.aurora.scheduler.mesos;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorSettings;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.mesos.Protos.CommandInfo;
@@ -31,10 +31,12 @@ import org.apache.mesos.Protos.CommandInfo.URI;
import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskInfo;
-import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl.MIN_TASK_RESOURCES;
+import static org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl.MIN_THERMOS_RESOURCES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class MesosTaskFactoryImplTest {
@@ -53,22 +55,26 @@ public class MesosTaskFactoryImplTest {
.setNumCpus(5)
.setRequestedPorts(ImmutableSet.of("http"))));
private static final SlaveID SLAVE = SlaveID.newBuilder().setValue("slave-id").build();
+ private static final Resources SOME_EXECUTOR_OVERHEAD = new Resources(
+ 0.01,
+ Amount.of(128L, Data.MB),
+ Amount.of(0L, Data.MB),
+ 0);
- private MesosTaskFactory taskFactory;
+ private static final Resources NO_EXECUTOR_OVERHEAD = new Resources(
+ 0,
+ Amount.of(0L, Data.MB),
+ Amount.of(0L, Data.MB),
+ 0);
- @Before
- public void setUp() {
- taskFactory = new MesosTaskFactoryImpl(new ExecutorConfig(EXECUTOR_PATH));
- }
+ private MesosTaskFactory taskFactory;
+ private ExecutorSettings config;
private static final ExecutorInfo DEFAULT_EXECUTOR = ExecutorInfo.newBuilder()
.setExecutorId(MesosTaskFactoryImpl.getExecutorId(TASK.getTaskId()))
.setName(MesosTaskFactoryImpl.EXECUTOR_NAME)
.setSource(MesosTaskFactoryImpl.getInstanceSourceName(TASK.getTask(), TASK.getInstanceId()))
- .addResources(Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
- .addResources(Resources.makeMesosResource(
- Resources.RAM_MB,
- ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
+ .addAllResources(MIN_THERMOS_RESOURCES.toResourceList())
.setCommand(CommandInfo.newBuilder()
.setValue("./executor.sh")
.addUris(URI.newBuilder().setValue(EXECUTOR_PATH).setExecutable(true)))
@@ -76,12 +82,24 @@ public class MesosTaskFactoryImplTest {
@Test
public void testExecutorInfoUnchanged() {
+ config = new ExecutorSettings(EXECUTOR_PATH, SOME_EXECUTOR_OVERHEAD);
+ taskFactory = new MesosTaskFactoryImpl(config);
TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
+ double taskCPU = config.getExecutorOverhead().getNumCpus()
+ + TASK.getTask().getNumCpus()
+ - MIN_THERMOS_RESOURCES.getNumCpus();
+ long taskRamMB = config.getExecutorOverhead().getRam().as(Data.MB)
+ + TASK.getTask().getRamMb()
+ - MIN_THERMOS_RESOURCES.getRam().as(Data.MB);
+
+ assertTrue(taskCPU > 0.0);
+ assertTrue(taskRamMB > 0);
+
assertEquals(ImmutableSet.of(
- Resources.makeMesosResource(Resources.CPUS, TASK.getTask().getNumCpus()),
- Resources.makeMesosResource(Resources.RAM_MB, TASK.getTask().getRamMb()),
+ Resources.makeMesosResource(Resources.CPUS, taskCPU),
Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb()),
+ Resources.makeMesosResource(Resources.RAM_MB, taskRamMB),
Resources.makeMesosRangeResource(
Resources.PORTS,
ImmutableSet.copyOf(TASK.getAssignedPorts().values()))
@@ -91,15 +109,67 @@ public class MesosTaskFactoryImplTest {
@Test
public void testCreateFromPortsUnset() {
+ config = new ExecutorSettings(EXECUTOR_PATH, SOME_EXECUTOR_OVERHEAD);
+ taskFactory = new MesosTaskFactoryImpl(config);
AssignedTask assignedTask = TASK.newBuilder();
assignedTask.unsetAssignedPorts();
TaskInfo task = taskFactory.createFrom(IAssignedTask.build(assignedTask), SLAVE);
+
+ double taskCPU = config.getExecutorOverhead().getNumCpus()
+ + TASK.getTask().getNumCpus()
+ - MIN_THERMOS_RESOURCES.getNumCpus();
+ long taskRamMB = config.getExecutorOverhead().getRam().as(Data.MB)
+ + TASK.getTask().getRamMb()
+ - MIN_THERMOS_RESOURCES.getRam().as(Data.MB);
+
+ assertTrue(taskCPU > 0.0);
+ assertTrue(taskRamMB > 0);
+
assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
assertEquals(ImmutableSet.of(
- Resources.makeMesosResource(Resources.CPUS, TASK.getTask().getNumCpus()),
- Resources.makeMesosResource(Resources.RAM_MB, TASK.getTask().getRamMb()),
- Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb())
+ Resources.makeMesosResource(Resources.CPUS, taskCPU),
+ Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb()),
+ Resources.makeMesosResource(Resources.RAM_MB, taskRamMB)
),
ImmutableSet.copyOf(task.getResourcesList()));
}
+
+ @Test
+ public void testExecutorInfoNoOverhead() {
+ // Here the ram required for the executor is greater than the sum of task resources
+ // + executor overhead. We need to ensure we allocate a non-zero amount of ram in this case.
+ config = new ExecutorSettings(EXECUTOR_PATH, NO_EXECUTOR_OVERHEAD);
+ taskFactory = new MesosTaskFactoryImpl(config);
+ TaskInfo task = taskFactory.createFrom(TASK, SLAVE);
+ assertEquals(DEFAULT_EXECUTOR, task.getExecutor());
+
+ double taskCPU = config.getExecutorOverhead().getNumCpus()
+ + TASK.getTask().getNumCpus()
+ - MIN_THERMOS_RESOURCES.getNumCpus();
+
+ assertTrue(taskCPU > 0.0);
+
+ assertEquals(ImmutableSet.of(
+ Resources.makeMesosResource(Resources.CPUS, taskCPU),
+ Resources.makeMesosResource(Resources.RAM_MB, MIN_TASK_RESOURCES.getRam().as(Data.MB)),
+ Resources.makeMesosResource(Resources.DISK_MB, TASK.getTask().getDiskMb()),
+ Resources.makeMesosRangeResource(
+ Resources.PORTS,
+ ImmutableSet.copyOf(TASK.getAssignedPorts().values()))
+ ),
+ ImmutableSet.copyOf(task.getResourcesList()));
+ }
+
+ @Test
+ public void testMaxElements() {
+ Resources highRAM = new Resources(1, Amount.of(8L, Data.GB), Amount.of(10L, Data.MB), 0);
+ Resources rest = new Resources(10, Amount.of(1L, Data.MB), Amount.of(10L, Data.GB), 1);
+
+ Resources result = MesosTaskFactoryImpl.maxElements(highRAM, rest);
+ assertEquals(result.getNumCpus(), 10, 0.001);
+ assertEquals(result.getRam(), Amount.of(8L, Data.GB));
+ assertEquals(result.getDisk(), Amount.of(10L, Data.GB));
+ assertEquals(result.getNumPorts(), 1);
+ }
+
}