You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/11/14 03:01:52 UTC
incubator-aurora git commit: Adding resource consumption calculation
for cron jobs.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 4d4b41aaf -> 316f291e3
Adding resource consumption calculation for cron jobs.
Bugs closed: AURORA-825
Reviewed at https://reviews.apache.org/r/27601/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/316f291e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/316f291e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/316f291e
Branch: refs/heads/master
Commit: 316f291e37747383470b1b0ae047f43098f626f4
Parents: 4d4b41a
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Nov 13 18:01:47 2014 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Nov 13 18:01:47 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/quota/QuotaInfo.java | 29 +++
.../aurora/scheduler/quota/QuotaManager.java | 238 +++++++++++++------
.../scheduler/quota/QuotaManagerImplTest.java | 202 ++++++++++++----
3 files changed, 343 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/316f291e/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
index d4e0f53..3e25812 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
@@ -13,6 +13,8 @@
*/
package org.apache.aurora.scheduler.quota;
+import java.util.Objects;
+
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import static java.util.Objects.requireNonNull;
@@ -61,4 +63,31 @@ public class QuotaInfo {
public IResourceAggregate getNonProdConsumption() {
return nonProdConsumption;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof QuotaInfo)) {
+ return false;
+ }
+
+ QuotaInfo other = (QuotaInfo) o;
+
+ return Objects.equals(quota, other.quota)
+ && Objects.equals(prodConsumption, other.prodConsumption)
+ && Objects.equals(nonProdConsumption, other.nonProdConsumption);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(quota, prodConsumption, nonProdConsumption);
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.Objects.toStringHelper(this)
+ .add("quota", quota)
+ .add("prodConsumption", prodConsumption)
+ .add("nonProdConsumption", nonProdConsumption)
+ .toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/316f291e/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 3fbab8b..e38407e 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -27,6 +27,7 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableRangeSet;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
@@ -34,14 +35,16 @@ import com.google.inject.Inject;
import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.ResourceAggregates;
-import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
@@ -55,6 +58,9 @@ import org.apache.aurora.scheduler.updater.JobUpdateController;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.scheduler.base.Tasks.INFO_TO_JOB_KEY;
+import static org.apache.aurora.scheduler.base.Tasks.IS_PRODUCTION;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_INFO;
import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
/**
@@ -112,10 +118,12 @@ public interface QuotaManager {
*/
class QuotaManagerImpl implements QuotaManager {
private final Storage storage;
+ private final CronJobManager cronJobManager;
@Inject
- QuotaManagerImpl(Storage storage) {
+ QuotaManagerImpl(Storage storage, CronJobManager cronJobManager) {
this.storage = requireNonNull(storage);
+ this.cronJobManager = requireNonNull(cronJobManager);
}
@Override
@@ -151,11 +159,11 @@ public interface QuotaManager {
}
QuotaInfo quotaInfo = getQuotaInfo(template.getJob().getRole());
+ IResourceAggregate requestedTotal = add(
+ quotaInfo.getProdConsumption(),
+ ResourceAggregates.scale(fromTasks(ImmutableSet.of(template)), instances));
- return QuotaCheckResult.greaterOrEqual(
- quotaInfo.getQuota(),
- add(quotaInfo.getProdConsumption(), ResourceAggregates.scale(
- prodResourcesFromTasks(ImmutableSet.of(template)), instances)));
+ return QuotaCheckResult.greaterOrEqual(quotaInfo.getQuota(), requestedTotal);
}
@Override
@@ -177,11 +185,11 @@ public interface QuotaManager {
/**
* Gets QuotaInfo with currently allocated quota and actual consumption data.
* <p>
- * In case an optional {@code requestedUpdate} is specified, the production consumption returned
- * also includes an estimated resources share of that update as if it was already in progress.
+ * In case an optional {@code requestedUpdate} is specified, the consumption returned also
+ * includes an estimated resources share of that update as if it was already in progress.
*
* @param role Role to get quota info for.
- * @param requestedUpdate An optional {@code IJobUpdate} to forecast the prod consumption.
+ * @param requestedUpdate An optional {@code IJobUpdate} to forecast the consumption.
* @return {@code QuotaInfo} with quota and consumption details.
*/
private QuotaInfo getQuotaInfo(final String role, final Optional<IJobUpdate> requestedUpdate) {
@@ -191,12 +199,25 @@ public interface QuotaManager {
FluentIterable<IScheduledTask> tasks = FluentIterable.from(
storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()));
- IResourceAggregate prodConsumed =
- getProdConsumption(storeProvider.getJobUpdateStore(), role, tasks, requestedUpdate);
+ Map<IJobKey, IJobUpdate> updates = Maps.newHashMap(
+ fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role)
+ .uniqueIndex(UPDATE_TO_JOB_KEY));
- // TODO(maxim): Consider a similar update-aware approach for computing nonProdConsumed.
- IResourceAggregate nonProdConsumed = fromTasks(
- tasks.transform(Tasks.SCHEDULED_TO_INFO).filter(Predicates.not(Tasks.IS_PRODUCTION)));
+ // Mix in a requested job update (if present) to correctly calculate consumption.
+ // This would be an update that is not saved in the store yet (i.e. the one quota is
+ // checked for).
+ if (requestedUpdate.isPresent()) {
+ updates.put(requestedUpdate.get().getSummary().getJobKey(), requestedUpdate.get());
+ }
+
+ Map<IJobKey, IJobConfiguration> cronTemplates =
+ FluentIterable.from(cronJobManager.getJobs())
+ .filter(Predicates.compose(Predicates.equalTo(role), JobKeys.CONFIG_TO_ROLE))
+ .uniqueIndex(JobKeys.FROM_CONFIG);
+
+ IResourceAggregate prodConsumed = getConsumption(tasks, updates, cronTemplates, true);
+
+ IResourceAggregate nonProdConsumed = getConsumption(tasks, updates, cronTemplates, false);
IResourceAggregate quota =
storeProvider.getQuotaStore().fetchQuota(role).or(ResourceAggregates.none());
@@ -206,47 +227,99 @@ public interface QuotaManager {
});
}
- private IResourceAggregate getProdConsumption(
- JobUpdateStore jobUpdateStore,
- String role,
+ private IResourceAggregate getConsumption(
FluentIterable<IScheduledTask> tasks,
- Optional<IJobUpdate> requestedUpdate) {
-
- // The algorithm here is as follows:
- // 1. Load all production active tasks that belong to jobs without active updates OR
- // unaffected by an active update working set. An example of the latter would be instances
- // not updated by the update due to being already in desired state or outside of update
- // range (e.g. not in JobUpdateInstructions.updateOnlyTheseInstances).
- // Calculate consumed resources as "nonUpdateConsumption".
- //
- // 2. Mix in a requested job update (if present) to correctly calculate prod consumption.
- // This would be an update that is not saved in the store yet (i.e. the one quota is
- // checked for).
+ Map<IJobKey, IJobUpdate> updatesByKey,
+ Map<IJobKey, IJobConfiguration> cronTemplatesByKey,
+ boolean isProd) {
+
+ Predicate<ITaskConfig> prodFilter = isProd ? IS_PRODUCTION : Predicates.not(IS_PRODUCTION);
+
+ FluentIterable<IScheduledTask> filteredTasks =
+ tasks.filter(Predicates.compose(prodFilter, SCHEDULED_TO_INFO));
+
+ IResourceAggregate nonCronConsumption = getNonCronConsumption(
+ updatesByKey,
+ excludeCronTasks(filteredTasks, cronTemplatesByKey),
+ isProd);
+
+ IResourceAggregate cronConsumption =
+ getCronConsumption(cronTemplatesByKey, filteredTasks, isProd);
+
+ return add(nonCronConsumption, cronConsumption);
+ }
+
+ private static IResourceAggregate getNonCronConsumption(
+ Map<IJobKey, IJobUpdate> updatesByKey,
+ FluentIterable<IScheduledTask> tasks,
+ boolean isProd) {
+
+ // 1. Get all active tasks that belong to jobs without active updates OR unaffected by an
+ // active update working set. An example of the latter would be instances not updated by
+ // the update due to being already in desired state or outside of update range (e.g.
+ // not in JobUpdateInstructions.updateOnlyTheseInstances). Calculate consumed resources
+ // as "nonUpdateConsumption".
//
- // 3. Calculate consumed resources from instances affected by the active job updates as
+ // 2. Calculate consumed resources from instances affected by the active job updates as
// "updateConsumption".
//
- // 4. Add up the two to yield total prod consumption.
+ // 3. Add up the two to yield total consumption.
- Map<IJobKey, IJobUpdate> updatesByKey = Maps.newHashMap(
- fetchActiveJobUpdates(jobUpdateStore, role).uniqueIndex(UPDATE_TO_JOB_KEY));
-
- if (requestedUpdate.isPresent()) {
- updatesByKey.put(requestedUpdate.get().getSummary().getJobKey(), requestedUpdate.get());
- }
-
- IResourceAggregate nonUpdateConsumption = prodResourcesFromTasks(tasks
+ IResourceAggregate nonUpdateConsumption = fromTasks(tasks
.filter(buildNonUpdatingTasksFilter(updatesByKey))
- .transform(Tasks.SCHEDULED_TO_INFO));
+ .transform(SCHEDULED_TO_INFO));
IResourceAggregate updateConsumption = ResourceAggregates.EMPTY;
for (IJobUpdate update : updatesByKey.values()) {
- updateConsumption = add(updateConsumption, toProdResources(update.getInstructions()));
+ updateConsumption =
+ add(updateConsumption, instructionsToResources(update.getInstructions(), isProd));
}
return add(nonUpdateConsumption, updateConsumption);
}
+ private static IResourceAggregate getCronConsumption(
+ Map<IJobKey, IJobConfiguration> cronTemplates,
+ FluentIterable<IScheduledTask> tasks,
+ boolean isProd) {
+
+ // Calculate the overall cron consumption as MAX between cron template resources and active
+ // cron tasks. This is required to account for a case when a running cron task has higher
+ // resource requirements than its updated template.
+ //
+ // While this is the "worst case" calculation that does not account for a possible "staggered"
+ // cron scheduling, it's the simplest approach possible given the system constraints (e.g.:
+ // lack of enforcement on a cron job run duration).
+
+ Multimap<IJobKey, ITaskConfig> taskConfigsByKey =
+ tasks.transform(SCHEDULED_TO_INFO).index(INFO_TO_JOB_KEY);
+
+ IResourceAggregate totalConsumption = ResourceAggregates.EMPTY;
+ for (IJobConfiguration config : cronTemplates.values()) {
+ if (isProd == config.getTaskConfig().isProduction()) {
+ IResourceAggregate templateConsumption = ResourceAggregates.scale(
+ fromTasks(ImmutableSet.of(config.getTaskConfig())), config.getInstanceCount());
+
+ IResourceAggregate taskConsumption = fromTasks(taskConfigsByKey.get(config.getKey()));
+
+ totalConsumption = add(totalConsumption, max(templateConsumption, taskConsumption));
+ }
+ }
+ return totalConsumption;
+ }
+
+ private static FluentIterable<IScheduledTask> excludeCronTasks(
+ FluentIterable<IScheduledTask> tasks,
+ final Map<IJobKey, IJobConfiguration> cronJobs) {
+
+ return tasks.filter(new Predicate<IScheduledTask>() {
+ @Override
+ public boolean apply(IScheduledTask input) {
+ return !cronJobs.containsKey(input.getAssignedTask().getTask().getJob());
+ }
+ });
+ }
+
private static Predicate<IScheduledTask> buildNonUpdatingTasksFilter(
final Map<IJobKey, IJobUpdate> roleJobUpdates) {
@@ -303,53 +376,68 @@ public interface QuotaManager {
return builder.build();
}
- private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) {
- return IResourceAggregate.build(new ResourceAggregate()
- .setNumCpus(a.getNumCpus() + b.getNumCpus())
- .setRamMb(a.getRamMb() + b.getRamMb())
- .setDiskMb(a.getDiskMb() + b.getDiskMb()));
- }
-
/**
- * This function calculates max aggregate production resources consumed by the job update
+ * This function calculates max aggregate resources consumed by the job update
* {@code instructions}. The max is calculated between existing and desired task configs on per
* resource basis. This means max CPU, RAM and DISK values are computed individually and may
* come from different task configurations. While it may not be the most accurate
* representation of job update resources during the update, it does guarantee none of the
* individual resource values is exceeded during the forward/back roll.
*
+ * NOTE: In case of a job update converting the job production bit (i.e. prod -> non-prod or
+ * non-prod -> prod), only the matching state is counted towards consumption. For example,
+ * prod -> non-prod AND {@code prodConsumption=True}: only the initial state is accounted.
+ *
* @param instructions Update instructions with resource definitions.
+ * @param isProd Flag indicating whether the prod or non-prod calculation requested.
* @return Resources consumed by the update.
*/
- private static IResourceAggregate toProdResources(IJobUpdateInstructions instructions) {
- double existingCpu = 0;
- int existingRamMb = 0;
- int existingDiskMb = 0;
+ private static IResourceAggregate instructionsToResources(
+ IJobUpdateInstructions instructions,
+ final boolean isProd) {
+
+ // Calculate initial state consumption.
+ IResourceAggregate initial = ResourceAggregates.EMPTY;
for (IInstanceTaskConfig group : instructions.getInitialState()) {
ITaskConfig task = group.getTask();
- if (task.isProduction()) {
+ if (isProd == task.isProduction()) {
for (IRange range : group.getInstances()) {
- int numInstances = range.getLast() - range.getFirst() + 1;
- existingCpu += task.getNumCpus() * numInstances;
- existingRamMb += task.getRamMb() * numInstances;
- existingDiskMb += task.getDiskMb() * numInstances;
+ initial = add(initial, ResourceAggregates.scale(
+ fromTasks(ImmutableSet.of(task)),
+ instanceCountFromRange(range)));
}
}
}
- // Calculate desired prod task consumption.
+ // Calculate desired state consumption.
IResourceAggregate desired = Optional.fromNullable(instructions.getDesiredState())
- .transform(TO_PROD_RESOURCES).or(ResourceAggregates.EMPTY);
+ .transform(new Function<IInstanceTaskConfig, IResourceAggregate>() {
+ @Override
+ public IResourceAggregate apply(IInstanceTaskConfig input) {
+ return isProd == input.getTask().isProduction()
+ ? ResourceAggregates.scale(
+ fromTasks(ImmutableSet.of(input.getTask())),
+ getUpdateInstanceCount(input.getInstances()))
+ : ResourceAggregates.EMPTY;
+ }
+ }).or(ResourceAggregates.EMPTY);
+
+ // Calculate result as max(existing, desired) per resource type.
+ return max(initial, desired);
+ }
- // Calculate result as max(existing, desired) per resource.
+ private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) {
return IResourceAggregate.build(new ResourceAggregate()
- .setNumCpus(Math.max(existingCpu, desired.getNumCpus()))
- .setRamMb(Math.max(existingRamMb, desired.getRamMb()))
- .setDiskMb(Math.max(existingDiskMb, desired.getDiskMb())));
+ .setNumCpus(a.getNumCpus() + b.getNumCpus())
+ .setRamMb(a.getRamMb() + b.getRamMb())
+ .setDiskMb(a.getDiskMb() + b.getDiskMb()));
}
- private static IResourceAggregate prodResourcesFromTasks(Iterable<ITaskConfig> tasks) {
- return fromTasks(FluentIterable.from(tasks).filter(Tasks.IS_PRODUCTION));
+ private static IResourceAggregate max(IResourceAggregate a, IResourceAggregate b) {
+ return IResourceAggregate.build(new ResourceAggregate()
+ .setNumCpus(Math.max(a.getNumCpus(), b.getNumCpus()))
+ .setRamMb(Math.max(a.getRamMb(), b.getRamMb()))
+ .setDiskMb(Math.max(a.getDiskMb(), b.getDiskMb())));
}
private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
@@ -368,18 +456,6 @@ public interface QuotaManager {
.setDiskMb(diskMb));
}
- private static final Function<IInstanceTaskConfig, IResourceAggregate> TO_PROD_RESOURCES =
- new Function<IInstanceTaskConfig, IResourceAggregate>() {
- @Override
- public IResourceAggregate apply(IInstanceTaskConfig input) {
- return input.getTask().isProduction()
- ? ResourceAggregates.scale(
- prodResourcesFromTasks(ImmutableSet.of(input.getTask())),
- getUpdateInstanceCount(input.getInstances()))
- : ResourceAggregates.EMPTY;
- }
- };
-
private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
new Function<IJobUpdate, IJobKey>() {
@Override
@@ -391,10 +467,14 @@ public interface QuotaManager {
private static int getUpdateInstanceCount(Set<IRange> ranges) {
int instanceCount = 0;
for (IRange range : ranges) {
- instanceCount += range.getLast() - range.getFirst() + 1;
+ instanceCount += instanceCountFromRange(range);
}
return instanceCount;
}
+
+ private static int instanceCountFromRange(IRange range) {
+ return range.getLast() - range.getFirst() + 1;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/316f291e/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 7a46148..b0772f7 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -23,6 +23,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobConfiguration;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.JobUpdate;
import org.apache.aurora.gen.JobUpdateInstructions;
@@ -33,10 +34,11 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ResourceAggregates;
+import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
@@ -68,12 +70,14 @@ public class QuotaManagerImplTest extends EasyMockTest {
private StorageTestUtil storageUtil;
private JobUpdateStore jobUpdateStore;
private QuotaManagerImpl quotaManager;
+ private CronJobManager cronJobManager;
@Before
public void setUp() throws Exception {
storageUtil = new StorageTestUtil(this);
jobUpdateStore = storageUtil.jobUpdateStore;
- quotaManager = new QuotaManagerImpl(storageUtil.storage);
+ cronJobManager = createMock(CronJobManager.class);
+ quotaManager = new QuotaManagerImpl(storageUtil.storage, cronJobManager);
storageUtil.expectOperations();
}
@@ -86,16 +90,50 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(quota);
expectTasks(prodTask, nonProdTask);
expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+ expectCronJobs(
+ createJob(createProdTask("pc", 1, 1, 1), 2),
+ createJob(createNonProdTask("npc", 7, 7, 7), 1));
control.replay();
- QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
- assertEquals(quota, quotaInfo.getQuota());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(4, 4, 4)), quotaInfo.getProdConsumption());
+ new QuotaInfo(from(4, 4, 4), from(6, 6, 6), from(9, 9, 9)),
+ quotaManager.getQuotaInfo(ROLE));
+ }
+
+ @Test
+ public void testGetQuotaInfoWithCronTasks() {
+ IScheduledTask prodTask = createProdTask("pc", 6, 6, 6);
+ IScheduledTask nonProdTask = createProdTask("npc", 7, 7, 7);
+ IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate(4, 4, 4));
+
+ expectQuota(quota);
+ expectTasks(prodTask, nonProdTask);
+ expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+
+ final String pcRole = "pc-role";
+ ScheduledTask ignoredProdTask = createProdTask(pcRole, 20, 20, 20).newBuilder();
+ ignoredProdTask.getAssignedTask().getTask()
+ .setOwner(new Identity(pcRole, "ignored"))
+ .setJob(new JobKey(pcRole, ENV, pcRole));
+
+ final String npcRole = "npc-role";
+ ScheduledTask ignoredNonProdTask = createNonProdTask(npcRole, 20, 20, 20).newBuilder();
+ ignoredNonProdTask.getAssignedTask().getTask()
+ .setOwner(new Identity(npcRole, "ignored"))
+ .setJob(new JobKey(npcRole, ENV, npcRole));
+
+ expectCronJobs(
+ createJob(createProdTask("pc", 3, 3, 3), 1),
+ createJob(createNonProdTask("npc", 5, 5, 5), 2),
+ createJob(IScheduledTask.build(ignoredProdTask), 2),
+ createJob(IScheduledTask.build(ignoredNonProdTask), 3));
+
+ control.replay();
+
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(2, 2, 2)),
- quotaInfo.getNonProdConsumption());
+ new QuotaInfo(from(4, 4, 4), from(7, 7, 7), from(10, 10, 10)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -109,35 +147,30 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(quota);
expectTasks(prodTask, updatingProdTask, updatingFilteredProdTask, nonProdTask);
expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+ expectNoCronJobs();
control.replay();
- QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
- assertEquals(quota, quotaInfo.getQuota());
-
// Expected consumption from: prodTask + updatingProdTask + job update.
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(7, 7, 7)), quotaInfo.getProdConsumption());
-
- assertEquals(
- IResourceAggregate.build(new ResourceAggregate(2, 2, 2)),
- quotaInfo.getNonProdConsumption());
+ new QuotaInfo(from(4, 4, 4), from(7, 7, 7), from(2, 2, 2)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
- public void testGetQuotaInfoNoTasksNoUpdates() {
+ public void testGetQuotaInfoNoTasksNoUpdatesNoCronJobs() {
IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate(4, 4, 4));
expectQuota(quota);
expectNoTasks();
expectNoJobUpdates();
+ expectNoCronJobs();
control.replay();
- QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
- assertEquals(quota, quotaInfo.getQuota());
- assertEquals(ResourceAggregates.none(), quotaInfo.getProdConsumption());
- assertEquals(ResourceAggregates.none(), quotaInfo.getNonProdConsumption());
+ assertEquals(
+ new QuotaInfo(from(4, 4, 4), from(0, 0, 0), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -145,6 +178,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
expectTasks(createProdTask("foo", 2, 2, 2));
expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+ expectNoCronJobs();
control.replay();
@@ -157,6 +191,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
expectNoTasks();
expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
+ expectNoCronJobs();
control.replay();
@@ -169,6 +204,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
expectTasks(createProdTask("foo", 2, 2, 2));
expectNoJobUpdates();
+ expectNoCronJobs();
control.replay();
@@ -181,6 +217,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
expectNoTasks();
expectNoJobUpdates();
+ expectNoCronJobs();
control.replay();
@@ -194,6 +231,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectTasks(createProdTask("foo", 2, 2, 2), createTask("bar", "id2", 5, 5, 5, false, 0));
expectNoJobUpdates();
+ expectNoCronJobs();
control.replay();
@@ -216,6 +254,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectNoTasks();
expectNoJobUpdates();
+ expectNoCronJobs();
control.replay();
@@ -228,6 +267,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
expectTasks(createProdTask("foo", 3, 3, 3));
expectNoJobUpdates();
+ expectNoCronJobs();
control.replay();
@@ -241,6 +281,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
expectTasks(createProdTask("foo", 3, 3, 3));
expectNoJobUpdates();
+ expectNoCronJobs();
control.replay();
@@ -254,6 +295,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
expectTasks(createProdTask("foo", 3, 3, 3));
expectNoJobUpdates();
+ expectNoCronJobs();
control.replay();
@@ -263,20 +305,39 @@ public class QuotaManagerImplTest extends EasyMockTest {
}
@Test
+ public void testCheckQuotaExceedsCron() {
+ expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
+ expectNoTasks().times(2);
+ expectNoJobUpdates().times(2);
+ expectCronJobs(
+ createJob(createProdTask("pc", 4, 4, 4), 1),
+ createJob(createNonProdTask("npc", 7, 7, 7), 1)).times(2);
+
+ control.replay();
+
+ QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(2, 2, 2, true), 1);
+ assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+ assertEquals(
+ new QuotaInfo(from(5, 5, 5), from(4, 4, 4), from(7, 7, 7)),
+ quotaManager.getQuotaInfo(ROLE));
+ }
+
+ @Test
public void testCheckQuotaUpdatingTasksFilteredOut() {
expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
expectTasks(createProdTask("foo", 2, 2, 2), createTask(JOB_NAME, "id2", 3, 3, 3, true, 0))
.times(2);
expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(2, 2, 2, true), 2);
+ expectNoCronJobs().times(2);
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(5, 5, 5), from(4, 4, 4), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -285,14 +346,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
expectJobUpdates(taskConfig(8, 8, 8, false), taskConfig(4, 4, 4, false), 2);
+ expectNoCronJobs().times(2);
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(5, 5, 5), from(4, 4, 4), from(8, 8, 8)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -300,15 +362,16 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 1, 1, 1)).times(2);
- expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(4, 4, 4, false), 2);
+ expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(7, 7, 7, false), 2);
+ expectNoCronJobs().times(2);
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(5, 5, 5), from(4, 4, 4), from(7, 7, 7)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -317,14 +380,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
expectJobUpdates(taskConfig(1, 1, 1, false), taskConfig(1, 1, 1, true), 2);
+ expectNoCronJobs().times(2);
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(5, 5, 5)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(5, 5, 5), from(5, 5, 5), from(1, 1, 1)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -332,14 +396,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
expectJobUpdates(taskConfig(2, 2, 2, true), taskConfig(1, 1, 1, true), 2);
+ expectNoCronJobs().times(2);
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -347,14 +412,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
expectJobUpdates(taskConfig(1, 1, 1, true), 1, taskConfig(1, 1, 1, true), 2, 2);
+ expectNoCronJobs().times(2);
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -362,14 +428,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
expectJobUpdates(taskConfig(1, 1, 1, true), 2, taskConfig(1, 1, 1, true), 1, 2);
+ expectNoCronJobs().times(2);
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -390,13 +457,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
expect(jobUpdateStore.fetchJobUpdate(updateId))
.andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+ expectNoCronJobs().times(2);
+
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(4, 4, 4), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -417,13 +486,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
expect(jobUpdateStore.fetchJobUpdate(updateId))
.andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+ expectNoCronJobs().times(2);
+
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(4, 4, 4), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -444,13 +515,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
expect(jobUpdateStore.fetchJobUpdate(updateId))
.andReturn(Optional.of(IJobUpdate.build(builder))).times(2);
+ expectNoCronJobs().times(2);
+
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -470,13 +543,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
config,
1);
+ expectNoCronJobs().times(2);
+
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -493,13 +568,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
config,
3);
+ expectNoCronJobs().times(2);
+
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(4, 4, 4), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -519,13 +596,15 @@ public class QuotaManagerImplTest extends EasyMockTest {
config,
1);
+ expectNoCronJobs().times(2);
+
control.replay();
QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
assertEquals(
- IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
- quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+ new QuotaInfo(from(6, 6, 6), from(6, 6, 6), from(0, 0, 0)),
+ quotaManager.getQuotaInfo(ROLE));
}
@Test
@@ -647,6 +726,19 @@ public class QuotaManagerImplTest extends EasyMockTest {
return expectTasks();
}
+ private IExpectationSetters<?> expectNoCronJobs() {
+ return expect(cronJobManager.getJobs()).andReturn(ImmutableSet.<IJobConfiguration>of());
+ }
+
+ private IExpectationSetters<?> expectCronJobs(IJobConfiguration... jobs) {
+ ImmutableSet.Builder<IJobConfiguration> builder = ImmutableSet.builder();
+ for (IJobConfiguration job : jobs) {
+ builder.add(job);
+ }
+
+ return expect(cronJobManager.getJobs()).andReturn(builder.build());
+ }
+
private IExpectationSetters<Optional<IResourceAggregate>> expectQuota(IResourceAggregate quota) {
return expect(storageUtil.quotaStore.fetchQuota(ROLE))
.andReturn(Optional.of(quota));
@@ -662,6 +754,10 @@ public class QuotaManagerImplTest extends EasyMockTest {
return createTask(jobName, jobName + "id1", cpus, ramMb, diskMb, true, 0);
}
+ private IScheduledTask createNonProdTask(String jobName, int cpus, int ramMb, int diskMb) {
+ return createTask(jobName, jobName + "id1", cpus, ramMb, diskMb, false, 0);
+ }
+
private IScheduledTask createTask(
String jobName,
String taskId,
@@ -687,4 +783,16 @@ public class QuotaManagerImplTest extends EasyMockTest {
.setDiskMb(diskMb)
.setProduction(production))));
}
+
+ private IJobConfiguration createJob(IScheduledTask scheduledTask, int instanceCount) {
+ TaskConfig task = scheduledTask.newBuilder().getAssignedTask().getTask();
+ return IJobConfiguration.build(new JobConfiguration()
+ .setKey(task.getJob())
+ .setTaskConfig(task)
+ .setInstanceCount(instanceCount));
+ }
+
+ private static IResourceAggregate from(double cpu, int ramMb, int diskMb) {
+ return IResourceAggregate.build(new ResourceAggregate(cpu, ramMb, diskMb));
+ }
}