You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/01/15 22:06:17 UTC
[1/2] Client quota checks. Part 2: server side changes.
Updated Branches:
refs/heads/master a6ab7fdb7 -> e2b357e7f
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index b46f29a..6cefdfa 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -70,6 +70,8 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.ScheduleException;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.quota.QuotaInfo;
+import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.state.CronJobManager;
import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.state.LockManager.LockException;
@@ -125,6 +127,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
private static final ILock LOCK = ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()));
private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("test");
private static final Lock DEFAULT_LOCK = null;
+ private static final IQuota QUOTA = IQuota.build(new Quota(10.0, 1024, 2048));
+ private static final IQuota CONSUMED = IQuota.build(new Quota(0.0, 0, 0));
private StorageTestUtil storageUtil;
private SchedulerCore scheduler;
@@ -136,6 +140,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
private MaintenanceController maintenance;
private AuroraAdmin.Iface thrift;
private CronJobManager cronJobManager;
+ private QuotaManager quotaManager;
@Before
public void setUp() throws Exception {
@@ -150,6 +155,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
recovery = createMock(Recovery.class);
maintenance = createMock(MaintenanceController.class);
cronJobManager = createMock(CronJobManager.class);
+ quotaManager = createMock(QuotaManager.class);
// Use guice and install AuthModule to apply AOP-style auth layer.
Module testModule = new AbstractModule() {
@@ -163,6 +169,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
bind(Recovery.class).toInstance(recovery);
bind(MaintenanceController.class).toInstance(maintenance);
bind(CronJobManager.class).toInstance(cronJobManager);
+ bind(QuotaManager.class).toInstance(quotaManager);
bind(AuroraAdmin.Iface.class).to(SchedulerThriftInterface.class);
}
};
@@ -194,7 +201,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
}
@Test
- public void testPopulateJobConfigFailFilter() throws Exception {
+ public void testPopulateJobConfigFailQuotaCheck() throws Exception {
IJobConfiguration job = IJobConfiguration.build(makeJob());
scheduler.validateJobResources(anyObject(SanitizedConfiguration.class));
expectLastCall().andThrow(new ScheduleException("err"));
@@ -430,7 +437,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
.setDiskMb(100)
.setRamMb(200);
expectAuth(ROOT, true);
- storageUtil.quotaStore.saveQuota(ROLE, IQuota.build(quota));
+ quotaManager.saveQuota(ROLE, IQuota.build(quota));
control.replay();
@@ -439,6 +446,22 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
}
@Test
+ public void testSetQuotaFails() throws Exception {
+ Quota quota = new Quota()
+ .setNumCpus(10)
+ .setDiskMb(100)
+ .setRamMb(200);
+ expectAuth(ROOT, true);
+ quotaManager.saveQuota(ROLE, IQuota.build(quota));
+ expectLastCall().andThrow(new QuotaManager.QuotaException("fail"));
+
+ control.replay();
+
+ Response response = thrift.setQuota(ROLE, quota, SESSION);
+ assertEquals(ResponseCode.INVALID_REQUEST, response.getResponseCode());
+ }
+
+ @Test
public void testProvisionerSetQuota() throws Exception {
Quota quota = new Quota()
.setNumCpus(10)
@@ -446,7 +469,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
.setRamMb(200);
expectAuth(ROOT, false);
expectAuth(Capability.PROVISIONER, true);
- storageUtil.quotaStore.saveQuota(ROLE, IQuota.build(quota));
+ quotaManager.saveQuota(ROLE, IQuota.build(quota));
control.replay();
@@ -1319,6 +1342,20 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
assertEquals(ResponseCode.LOCK_ERROR, response.getResponseCode());
}
+ @Test
+ public void testGetQuota() throws Exception {
+ QuotaInfo infoMock = createMock(QuotaInfo.class);
+ expect(quotaManager.getQuotaInfo(ROLE)).andReturn(infoMock);
+ expect(infoMock.guota()).andReturn(QUOTA);
+ expect(infoMock.prodConsumption()).andReturn(CONSUMED);
+ control.replay();
+
+ Response response = thrift.getQuota(ROLE);
+ assertEquals(ResponseCode.OK, response.getResponseCode());
+ assertEquals(QUOTA.newBuilder(), response.getResult().getGetQuotaResult().getQuota());
+ assertEquals(CONSUMED.newBuilder(), response.getResult().getGetQuotaResult().getConsumed());
+ }
+
private static JobConfiguration makeJob() {
return makeJob(nonProductionTask(), 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index cce27a0..644d6e8 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -34,6 +34,7 @@ import org.apache.aurora.gen.AuroraAdmin;
import org.apache.aurora.gen.Quota;
import org.apache.aurora.gen.SessionKey;
import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.SchedulerCore;
@@ -66,6 +67,7 @@ public class ThriftIT extends EasyMockTest {
private AuroraAdmin.Iface thrift;
private StorageTestUtil storageTestUtil;
private SessionContext context;
+ private QuotaManager quotaManager;
private final SessionValidator validator = new SessionValidator() {
@Override public SessionContext checkAuthenticated(
@@ -120,6 +122,7 @@ public class ThriftIT extends EasyMockTest {
@Before
public void setUp() {
context = createMock(SessionContext.class);
+ quotaManager = createMock(QuotaManager.class);
createThrift(CAPABILITIES);
}
@@ -146,6 +149,7 @@ public class ThriftIT extends EasyMockTest {
bind(Storage.class).toInstance(storageTestUtil.storage);
bindMock(StorageBackup.class);
bindMock(ThriftConfiguration.class);
+ bind(QuotaManager.class).toInstance(quotaManager);
bind(SessionValidator.class).toInstance(validator);
bind(CapabilityValidator.class).toInstance(new CapabilityValidatorFake(validator));
}
@@ -163,8 +167,7 @@ public class ThriftIT extends EasyMockTest {
@Test
public void testProvisionAccess() throws Exception {
- storageTestUtil.expectOperations();
- storageTestUtil.quotaStore.saveQuota(USER, QUOTA);
+ quotaManager.saveQuota(USER, QUOTA);
expectLastCall().times(2);
control.replay();
[2/2] git commit: Client quota checks. Part 2: server side changes.
Posted by ma...@apache.org.
Client quota checks. Part 2: server side changes.
Reviewed at https://reviews.apache.org/r/16629/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e2b357e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e2b357e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e2b357e7
Branch: refs/heads/master
Commit: e2b357e7fc8cfd29f703f9f9d6f7e1b84babf25b
Parents: a6ab7fd
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed Jan 15 13:04:54 2014 -0800
Committer: Maxim Khutornenko <mk...@twitter.com>
Committed: Wed Jan 15 13:04:54 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/http/SchedulerzRole.java | 24 +--
.../scheduler/quota/QuotaCheckResult.java | 117 +++++++++++
.../scheduler/quota/QuotaComparisonResult.java | 90 ---------
.../aurora/scheduler/quota/QuotaFilter.java | 85 --------
.../aurora/scheduler/quota/QuotaInfo.java | 62 ++++++
.../aurora/scheduler/quota/QuotaManager.java | 147 ++++++++++----
.../aurora/scheduler/quota/QuotaModule.java | 4 -
.../apache/aurora/scheduler/quota/Quotas.java | 90 ---------
.../aurora/scheduler/state/JobFilter.java | 134 -------------
.../scheduler/state/SchedulerCoreImpl.java | 96 +++++----
.../aurora/scheduler/storage/Storage.java | 18 --
.../storage/testing/StorageTestUtil.java | 6 +
.../thrift/SchedulerThriftInterface.java | 63 +++---
.../scheduler/quota/QuotaCheckResultTest.java | 88 +++++++++
.../quota/QuotaComparisonResultTest.java | 88 ---------
.../aurora/scheduler/quota/QuotaFilterTest.java | 189 ------------------
.../scheduler/quota/QuotaManagerImplTest.java | 197 ++++++++++++-------
.../state/BaseSchedulerCoreImplTest.java | 39 ++--
.../thrift/SchedulerThriftInterfaceTest.java | 43 +++-
.../aurora/scheduler/thrift/ThriftIT.java | 7 +-
20 files changed, 663 insertions(+), 924 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
index 785efd0..b0caca7 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
@@ -36,7 +36,6 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -58,13 +57,12 @@ import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.quota.QuotaInfo;
import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.quota.Quotas;
import org.apache.aurora.scheduler.state.CronJobManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -152,26 +150,14 @@ public class SchedulerzRole extends JerseyTemplateServlet {
template.setAttribute("cronJobs", cronJobs.values());
// TODO(Suman Karumuri): In future compute consumption for role and environment.
- template.setAttribute("prodResourcesUsed", quotaManager.getConsumption(role.get()));
- template.setAttribute("nonProdResourcesUsed", getNonProdConsumption(role.get()));
- template.setAttribute("resourceQuota", getQuota(role.get()));
+ QuotaInfo quotaInfo = quotaManager.getQuotaInfo(role.get());
+ template.setAttribute("prodResourcesUsed", quotaInfo.prodConsumption());
+ template.setAttribute("nonProdResourcesUsed", quotaInfo.nonProdConsumption());
+ template.setAttribute("resourceQuota", quotaInfo.guota());
}
});
}
- private IQuota getQuota(final String role) {
- return Storage.Util.consistentFetchQuota(storage, role).or(Quotas.noQuota());
- }
-
- private IQuota getNonProdConsumption(String role) {
- FluentIterable<ITaskConfig> tasks = FluentIterable
- .from(Storage.Util.weaklyConsistentFetchTasks(storage, Query.roleScoped(role).active()))
- .transform(Tasks.SCHEDULED_TO_INFO)
- .filter(Predicates.not(Tasks.IS_PRODUCTION));
-
- return Quotas.fromTasks(tasks);
- }
-
/**
* Display jobs for a role and environment.
*/
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
new file mode 100644
index 0000000..c898535
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java
@@ -0,0 +1,117 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.quota;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+
+import org.apache.aurora.scheduler.storage.entities.IQuota;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Calculates and formats detailed quota comparison result.
+ */
+public class QuotaCheckResult {
+
+ /**
+ * Quota check result.
+ */
+ public static enum Result {
+ /**
+ * There is sufficient quota for the requested operation.
+ */
+ SUFFICIENT_QUOTA,
+
+ /**
+ * There is not enough allocated quota for the requested operation.
+ */
+ INSUFFICIENT_QUOTA
+ }
+
+ private static enum Resource {
+ CPU("core(s)"),
+ RAM("MB"),
+ DISK("MB");
+
+ private final String unit;
+ private Resource(String unit) {
+ this.unit = unit;
+ }
+ }
+
+ private final Optional<String> details;
+ private final Result result;
+
+ @VisibleForTesting
+ public QuotaCheckResult(Result result) {
+ this(result, Optional.<String>absent());
+ }
+
+ private QuotaCheckResult(Result result, Optional<String> details) {
+ this.result = checkNotNull(result);
+ this.details = checkNotNull(details);
+ }
+
+ /**
+ * Gets quota check result.
+ *
+ * @return Quota check result.
+ */
+ public Result getResult() {
+ return result;
+ }
+
+ /**
+ * Gets detailed quota violation description in case quota check fails.
+ *
+ * @return Quota check details.
+ */
+ public Optional<String> getDetails() {
+ return details;
+ }
+
+ static QuotaCheckResult greaterOrEqual(IQuota a, IQuota b) {
+ StringBuilder details = new StringBuilder();
+ boolean result = compare(a.getNumCpus(), b.getNumCpus(), Resource.CPU, details)
+ & compare(a.getRamMb(), b.getRamMb(), Resource.RAM, details)
+ & compare(a.getDiskMb(), b.getDiskMb(), Resource.DISK, details);
+
+ return new QuotaCheckResult(
+ result ? Result.SUFFICIENT_QUOTA : Result.INSUFFICIENT_QUOTA,
+ Optional.of(details.toString()));
+ }
+
+ private static boolean compare(
+ double a,
+ double b,
+ Resource resource,
+ StringBuilder details) {
+
+ boolean result = a >= b;
+ if (!result) {
+ details
+ .append(details.length() > 0 ? "; " : "")
+ .append(resource)
+ .append(" quota exceeded by ")
+ .append(String.format("%.2f", b - a))
+ .append(" ")
+ .append(resource.unit);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
deleted file mode 100644
index 99d2e4c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaComparisonResult.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.aurora.scheduler.storage.entities.IQuota;
-
-/**
- * Calculates and formats detailed quota comparison result.
- */
-class QuotaComparisonResult {
-
- private enum Resource {
- CPU("core(s)"),
- RAM("MB"),
- DISK("MB");
-
- private final String unit;
- private Resource(String unit) {
- this.unit = unit;
- }
- }
-
- enum Result {
- SUFFICIENT_QUOTA,
- INSUFFICIENT_QUOTA
- }
-
- private final String details;
- private final Result result;
-
- @VisibleForTesting
- QuotaComparisonResult(Result result, String details) {
- this.result = result;
- this.details = details;
- }
-
- Result result() {
- return result;
- }
-
- String details() {
- return details;
- }
-
- static QuotaComparisonResult greaterOrEqual(IQuota a, IQuota b) {
- StringBuilder details = new StringBuilder();
- boolean result = compare(a.getNumCpus(), b.getNumCpus(), Resource.CPU, details)
- & compare(a.getRamMb(), b.getRamMb(), Resource.RAM, details)
- & compare(a.getDiskMb(), b.getDiskMb(), Resource.DISK, details);
-
- return new QuotaComparisonResult(
- result ? Result.SUFFICIENT_QUOTA : Result.INSUFFICIENT_QUOTA,
- details.toString());
- }
-
- private static boolean compare(
- double a,
- double b,
- Resource resource,
- StringBuilder details) {
-
- boolean result = a >= b;
- if (!result) {
- details
- .append(details.length() > 0 ? "; " : "")
- .append(resource)
- .append(" quota exceeded by ")
- .append(String.format("%.2f", b - a))
- .append(" ")
- .append(resource.unit);
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
deleted file mode 100644
index 6ab7982..0000000
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaFilter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
-import org.apache.aurora.scheduler.state.JobFilter;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
-
-/**
- * A filter that fails production jobs for roles that do not have sufficient quota to run them.
- */
-class QuotaFilter implements JobFilter {
- private final QuotaManagerImpl quotaManager;
- private final Storage storage;
-
- @Inject
- QuotaFilter(QuotaManagerImpl quotaManager, Storage storage) {
- this.quotaManager = checkNotNull(quotaManager);
- this.storage = checkNotNull(storage);
- }
-
- @Override
- public JobFilterResult filter(final IJobConfiguration job) {
- return filterByTask(job.getKey(), job.getTaskConfig(), job.getInstanceCount());
- }
-
- @Override
- public JobFilterResult filter(ITaskConfig template, int instanceCount) {
- return filterByTask(JobKeys.from(template), template, instanceCount);
- }
-
- private synchronized JobFilterResult filterByTask(
- IJobKey jobKey,
- ITaskConfig template,
- int instanceCount) {
-
- if (!template.isProduction()) {
- return JobFilterResult.pass();
- }
-
- IQuota currentUsage = Quotas.fromProductionTasks(
- Iterables.transform(
- Storage.Util.consistentFetchTasks(storage, Query.jobScoped(jobKey).active()),
- Tasks.SCHEDULED_TO_INFO));
-
- IQuota additionalRequested =
- Quotas.subtract(Quotas.fromTasks(template, instanceCount), currentUsage);
- QuotaComparisonResult comparisonResult =
- quotaManager.checkQuota(jobKey.getRole(), additionalRequested);
-
- if (comparisonResult.result() == INSUFFICIENT_QUOTA) {
- return JobFilterResult.fail("Insufficient resource quota: " + comparisonResult.details());
- }
-
- return JobFilterResult.pass();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
new file mode 100644
index 0000000..0e286d8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.quota;
+
+import org.apache.aurora.scheduler.storage.entities.IQuota;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Wraps allocated quota and consumption details.
+ */
+public class QuotaInfo {
+ private final IQuota quota;
+ private final IQuota prodConsumption;
+ private final IQuota nonProdConsumption;
+
+ QuotaInfo(IQuota quota, IQuota prodConsumption, IQuota nonProdConsumption) {
+ this.quota = checkNotNull(quota);
+ this.prodConsumption = checkNotNull(prodConsumption);
+ this.nonProdConsumption = checkNotNull(nonProdConsumption);
+ }
+
+ /**
+ * Total quota available.
+ *
+ * @return Available quota.
+ */
+ public IQuota guota() {
+ return quota;
+ }
+
+ /**
+ * Quota consumed by production jobs.
+ *
+ * @return Production job consumption.
+ */
+ public IQuota prodConsumption() {
+ return prodConsumption;
+ }
+
+ /**
+ * Quota consumed by non-production jobs.
+ *
+ * @return Non production job consumption.
+ */
+ public IQuota nonProdConsumption() {
+ return nonProdConsumption;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 6b0645b..c93e57f 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -15,76 +15,149 @@
*/
package org.apache.aurora.scheduler.quota;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
+import org.apache.aurora.gen.Quota;
+import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work;
-import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
import org.apache.aurora.scheduler.storage.entities.IQuota;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
/**
* Allows access to resource quotas, and tracks quota consumption.
*/
public interface QuotaManager {
/**
- * Fetches the current resource usage for the role.
+ * Saves a new quota for the provided role or overrides the existing one.
+ *
+ * @param role Quota owner.
+ * @param quota Quota to save.
+ * @throws QuotaException If provided quota specification is invalid.
+ */
+ void saveQuota(String role, IQuota quota) throws QuotaException;
+
+ /**
+ * Gets {@code QuotaInfo} for the specified role.
+ *
+ * @param role Quota owner.
+ * @return {@code QuotaInfo} instance.
+ */
+ QuotaInfo getQuotaInfo(String role);
+
+ /**
+ * Checks if there is enough resource quota available for adding {@code instances}
+ * of {@code template} tasks. The quota is defined at the task owner (role) level.
*
- * @param role to fetch quota usage for.
- * @return Resource quota used by {@code role}.
+ * @param template Single task resource requirements.
+ * @param instances Number of task instances.
+ * @return {@code QuotaComparisonResult} instance with quota check result details.
*/
- IQuota getConsumption(String role);
+ QuotaCheckResult checkQuota(ITaskConfig template, int instances);
+
+ /**
+ * Thrown when quota related operation failed.
+ */
+ class QuotaException extends Exception {
+ public QuotaException(String msg) {
+ super(msg);
+ }
+ }
/**
* Quota provider that stores quotas in the canonical {@link Storage} system.
*/
- static class QuotaManagerImpl implements QuotaManager {
+ class QuotaManagerImpl implements QuotaManager {
private final Storage storage;
@Inject
- public QuotaManagerImpl(Storage storage) {
+ QuotaManagerImpl(Storage storage) {
this.storage = checkNotNull(storage);
}
@Override
- public IQuota getConsumption(final String role) {
- checkNotBlank(role);
-
- final Query.Builder query = Query.roleScoped(role).active();
-
- return storage.consistentRead(
- new Work.Quiet<IQuota>() {
- @Override public IQuota apply(StoreProvider storeProvider) {
- return Quotas.fromProductionTasks(Iterables.transform(
- storeProvider.getTaskStore().fetchTasks(query), Tasks.SCHEDULED_TO_INFO));
- }
- });
+ public void saveQuota(final String ownerRole, final IQuota quota) throws QuotaException {
+ if (!quota.isSetNumCpus() || !quota.isSetRamMb() || !quota.isSetDiskMb()) {
+ throw new QuotaException("Missing quota specification(s) in: " + quota.toString());
+ }
+
+ if (quota.getNumCpus() < 0.0 || quota.getRamMb() < 0 || quota.getDiskMb() < 0) {
+ throw new QuotaException("Negative values in: " + quota.toString());
+ }
+
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
+ storeProvider.getQuotaStore().saveQuota(ownerRole, quota);
+ }
+ });
}
- /**
- * Tests whether the role has at least the specified amount of quota available.
- *
- * @param role Role to consume quota for.
- * @param quota Quota amount to check for availability.
- * @return QuotaComparisonResult with {@code result()} returning {@code true} if the role
- * currently has at least {@code quota} quota remaining, {@code false} otherwise.
- */
- QuotaComparisonResult checkQuota(final String role, final IQuota quota) {
- checkNotBlank(role);
- checkNotNull(quota);
-
- return storage.consistentRead(new Quiet<QuotaComparisonResult>() {
- @Override public QuotaComparisonResult apply(StoreProvider storeProvider) {
- IQuota reserved = storeProvider.getQuotaStore().fetchQuota(role).or(Quotas.noQuota());
- return Quotas.greaterOrEqual(reserved, Quotas.add(getConsumption(role), quota));
+ @Override
+ public QuotaInfo getQuotaInfo(final String role) {
+ return storage.consistentRead(new Work.Quiet<QuotaInfo>() {
+ @Override public QuotaInfo apply(StoreProvider storeProvider) {
+ FluentIterable<ITaskConfig> tasks = FluentIterable
+ .from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()))
+ .transform(Tasks.SCHEDULED_TO_INFO);
+
+ IQuota prodConsumed = fromTasks(tasks.filter(Tasks.IS_PRODUCTION));
+ IQuota nonProdConsumed =
+ fromTasks(tasks.filter(Predicates.not(Tasks.IS_PRODUCTION)));
+
+ IQuota quota = storeProvider.getQuotaStore().fetchQuota(role).or(Quotas.noQuota());
+
+ return new QuotaInfo(quota, prodConsumed, nonProdConsumed);
}
});
}
+
+ @Override
+ public QuotaCheckResult checkQuota(ITaskConfig template, int instances) {
+ if (!template.isProduction()) {
+ return new QuotaCheckResult(SUFFICIENT_QUOTA);
+ }
+
+ QuotaInfo quotaInfo = getQuotaInfo(JobKeys.from(template).getRole());
+
+ IQuota additionalRequested =
+ Quotas.scale(fromTasks(ImmutableSet.of(template)), instances);
+
+ return QuotaCheckResult.greaterOrEqual(
+ quotaInfo.guota(),
+ add(quotaInfo.prodConsumption(), additionalRequested));
+ }
+
+ private static IQuota fromTasks(Iterable<ITaskConfig> tasks) {
+ double cpu = 0;
+ int ramMb = 0;
+ int diskMb = 0;
+ for (ITaskConfig task : tasks) {
+ cpu += task.getNumCpus();
+ ramMb += task.getRamMb();
+ diskMb += task.getDiskMb();
+ }
+
+ return IQuota.build(new Quota()
+ .setNumCpus(cpu)
+ .setRamMb(ramMb)
+ .setDiskMb(diskMb));
+ }
+
+ private static IQuota add(IQuota a, IQuota b) {
+ return IQuota.build(new Quota()
+ .setNumCpus(a.getNumCpus() + b.getNumCpus())
+ .setRamMb(a.getRamMb() + b.getRamMb())
+ .setDiskMb(a.getDiskMb() + b.getDiskMb()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
index 4a61949..fea2656 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaModule.java
@@ -20,7 +20,6 @@ import javax.inject.Singleton;
import com.google.inject.AbstractModule;
import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
-import org.apache.aurora.scheduler.state.JobFilter;
import org.apache.aurora.scheduler.storage.Storage;
/**
@@ -34,8 +33,5 @@ public class QuotaModule extends AbstractModule {
bind(QuotaManager.class).to(QuotaManagerImpl.class);
bind(QuotaManagerImpl.class).in(Singleton.class);
-
- bind(JobFilter.class).to(QuotaFilter.class);
- bind(QuotaFilter.class).in(Singleton.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java b/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
index 24f2093..9da3c3e 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/Quotas.java
@@ -15,16 +15,10 @@
*/
package org.apache.aurora.scheduler.quota;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.aurora.gen.Quota;
-import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.entities.IQuota;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static com.google.common.base.Preconditions.checkNotNull;
/**
* Convenience class for normalizing resource measures between tasks and offers.
@@ -46,79 +40,6 @@ public final class Quotas {
}
/**
- * Determines the amount of quota required for a set of job tasks.
- *
- * @param taskConfig Task template to count quota from.
- * @return Quota requirement to run {@code job}.
- */
- public static IQuota fromTasks(ITaskConfig taskConfig, int instanceCount) {
- return scale(fromProductionTasks(ImmutableSet.of(taskConfig)), instanceCount);
- }
-
- // TODO(Suman Karumuri): Refactor this function in to a new class.
- // TODO(Suman Karumuri): Rename Quota to something more meaningful (ex: ResourceAggregate)
- /**
- * Determines the amount of quota required for production tasks among {@code tasks}.
- *
- * @param tasks Tasks to count quota from.
- * @return Quota requirement to run {@code tasks}.
- */
- public static IQuota fromProductionTasks(Iterable<ITaskConfig> tasks) {
- checkNotNull(tasks);
-
- return fromTasks(Iterables.filter(tasks, Tasks.IS_PRODUCTION));
- }
-
- /**
- * Determines the amount of quota required for the given tasks.
- *
- * @param tasks Tasks to count quota from.
- * @return Quota requirement to run {@code tasks}.
- */
- public static IQuota fromTasks(Iterable<ITaskConfig> tasks) {
- double cpu = 0;
- int ramMb = 0;
- int diskMb = 0;
- for (ITaskConfig task : tasks) {
- cpu += task.getNumCpus();
- ramMb += task.getRamMb();
- diskMb += task.getDiskMb();
- }
-
- return IQuota.build(new Quota()
- .setNumCpus(cpu)
- .setRamMb(ramMb)
- .setDiskMb(diskMb));
- }
-
- /**
- * a >= b
- */
- public static QuotaComparisonResult greaterOrEqual(IQuota a, IQuota b) {
- return QuotaComparisonResult.greaterOrEqual(a, b);
- }
-
- /**
- * a + b
- */
- public static IQuota add(IQuota a, IQuota b) {
- return IQuota.build(new Quota()
- .setNumCpus(a.getNumCpus() + b.getNumCpus())
- .setRamMb(a.getRamMb() + b.getRamMb())
- .setDiskMb(a.getDiskMb() + b.getDiskMb()));
- }
-
- /**
- * a - b
- */
- public static IQuota subtract(IQuota a, IQuota b) {
- return IQuota.build(new Quota()
- .setNumCpus(a.getNumCpus() - b.getNumCpus())
- .setRamMb(a.getRamMb() - b.getRamMb())
- .setDiskMb(a.getDiskMb() - b.getDiskMb()));
- }
-
- /**
* a * m
*/
public static IQuota scale(IQuota a, int m) {
@@ -141,15 +62,4 @@ public final class Quotas {
(double) a.getDiskMb() / b.getDiskMb()
).intValue();
}
-
- /**
- * sum(qs)
- */
- public static IQuota sum(Iterable<IQuota> qs) {
- IQuota sum = noQuota();
- for (IQuota q : qs) {
- sum = Quotas.add(sum, q);
- }
- return sum;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java b/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
deleted file mode 100644
index 0d84c1e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/JobFilter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * An action that either accepts a configuration or rejects it with a reason.
- */
-public interface JobFilter {
- /**
- * Accept the JobConfiguration or reject it with a reason.
- *
- * @param jobConfiguration The job configuration to filter.
- * @return A result and the reason the result was reached.
- */
- JobFilterResult filter(IJobConfiguration jobConfiguration);
-
- /**
- * Accept the TaskConfig with the specified number of instances
- * or reject it with a reason.
- *
- * @param template The task configuration to filter.
- * @param instanceCount Number of instances to apply taskConfig to.
- * @return A result and the reason the result was reached.
- */
- JobFilterResult filter(ITaskConfig template, int instanceCount);
-
- /**
- * An indication of whether a job passed a filter or not.
- */
- public static final class JobFilterResult {
- private final boolean pass;
- private final String reason;
-
- private JobFilterResult(boolean pass, String reason) {
- this.pass = pass;
- this.reason = Preconditions.checkNotNull(reason);
- }
-
- /**
- * Create a new result indicating the job has passed the filter.
- *
- * @return A result indicating the job has passed with a default reason.
- * @see #fail(String)
- */
- public static JobFilterResult pass() {
- return new JobFilterResult(true, "Accepted by filter.");
- }
-
- /**
- * Create a new result indicating the job has passed the filter.
- *
- * @param reason A reason that the job has passed.
- * @return A result indicating the job has passed because of the given reason.
- * @throws NullPointerException if reason is {@code null}.
- * @see #fail(String)
- */
- public static JobFilterResult pass(String reason) {
- return new JobFilterResult(true, reason);
- }
-
- /**
- * Create a new result indicating the job has failed the filter.
- *
- * @param reason A reason that the job has failed.
- * @return A result indicating the job has failed because of the given reason.
- * @throws NullPointerException if {@code reason} is {@code null}.
- * @see #pass()
- * @see #pass(String)
- */
- public static JobFilterResult fail(String reason) {
- return new JobFilterResult(false, reason);
- }
-
- /**
- * Indicates whether the result indicates a pass.
- *
- * @return {@code true} if the result indicates a pass.
- */
- public boolean isPass() {
- return pass;
- }
-
- /**
- * Indicates the reason for the result.
- *
- * @return The reason for the result.
- */
- public String getReason() {
- return reason;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof JobFilterResult)) {
- return false;
- }
- JobFilterResult that = (JobFilterResult) o;
- return Objects.equal(pass, that.pass)
- && Objects.equal(reason, that.reason);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(pass, reason);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("pass", pass)
- .add("reason", reason)
- .toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index 8dec283..b3f19cc 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -43,6 +43,8 @@ import org.apache.aurora.scheduler.base.ScheduleException;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.quota.QuotaCheckResult;
+import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -57,6 +59,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.aurora.gen.ScheduleStatus.KILLING;
import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
/**
* Implementation of the scheduler core.
@@ -79,7 +82,7 @@ class SchedulerCoreImpl implements SchedulerCore {
private final StateManager stateManager;
private final TaskIdGenerator taskIdGenerator;
- private final JobFilter jobFilter;
+ private final QuotaManager quotaManager;
/**
* Creates a new core scheduler.
@@ -89,7 +92,7 @@ class SchedulerCoreImpl implements SchedulerCore {
* @param immediateScheduler Immediate scheduler.
* @param stateManager Persistent state manager.
* @param taskIdGenerator Task ID generator.
- * @param jobFilter Job filter.
+ * @param quotaManager Quota manager.
*/
@Inject
public SchedulerCoreImpl(
@@ -98,7 +101,7 @@ class SchedulerCoreImpl implements SchedulerCore {
ImmediateJobManager immediateScheduler,
StateManager stateManager,
TaskIdGenerator taskIdGenerator,
- JobFilter jobFilter) {
+ QuotaManager quotaManager) {
this.storage = checkNotNull(storage);
@@ -108,7 +111,7 @@ class SchedulerCoreImpl implements SchedulerCore {
this.cronScheduler = cronScheduler;
this.stateManager = checkNotNull(stateManager);
this.taskIdGenerator = checkNotNull(taskIdGenerator);
- this.jobFilter = checkNotNull(jobFilter);
+ this.quotaManager = checkNotNull(quotaManager);
}
private boolean hasActiveJob(IJobConfiguration job) {
@@ -123,30 +126,38 @@ class SchedulerCoreImpl implements SchedulerCore {
}
@Override
- public synchronized void createJob(SanitizedConfiguration sanitizedConfiguration)
+ public synchronized void createJob(final SanitizedConfiguration sanitizedConfiguration)
throws ScheduleException {
- IJobConfiguration job = sanitizedConfiguration.getJobConfig();
- if (hasActiveJob(job)) {
- throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
- }
+ storage.write(new MutateWork.NoResult<ScheduleException>() {
+ @Override protected void execute(MutableStoreProvider storeProvider)
+ throws ScheduleException {
+
+ final IJobConfiguration job = sanitizedConfiguration.getJobConfig();
+ if (hasActiveJob(job)) {
+ throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
+ }
- runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
+ validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
- boolean accepted = false;
- for (final JobManager manager : jobManagers) {
- if (manager.receiveJob(sanitizedConfiguration)) {
- LOG.info("Job accepted by manager: " + manager.getUniqueKey());
- accepted = true;
- break;
- }
- }
+ boolean accepted = false;
+ // TODO(wfarner): Remove the JobManager abstraction, and directly invoke addInstances
+ // here for non-cron jobs.
+ for (final JobManager manager : jobManagers) {
+ if (manager.receiveJob(sanitizedConfiguration)) {
+ LOG.info("Job accepted by manager: " + manager.getUniqueKey());
+ accepted = true;
+ break;
+ }
+ }
- if (!accepted) {
- LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
- LOG.severe("Discarded job: " + job);
- throw new ScheduleException("Job not accepted, discarding.");
- }
+ if (!accepted) {
+ LOG.severe("Job was not accepted by any of the configured schedulers, discarding.");
+ LOG.severe("Discarded job: " + job);
+ throw new ScheduleException("Job not accepted, discarding.");
+ }
+ }
+ });
}
// This number is derived from the maximum file name length limit on most UNIX systems, less
@@ -155,31 +166,32 @@ class SchedulerCoreImpl implements SchedulerCore {
@VisibleForTesting
static final int MAX_TASK_ID_LENGTH = 255 - 90;
- // TODO(maximk): Consider a better approach to quota checking. MESOS-4476.
- private void runJobFilters(IJobKey jobKey, ITaskConfig task, int count, boolean incremental)
+ /**
+ * Validates task specific requirements including name, count and quota checks.
+ * Must be performed inside of a write storage transaction along with state mutation change
+ * to avoid any data race conditions.
+ *
+ * @param task Task configuration.
+ * @param instances Number of task instances
+ * @throws ScheduleException If validation fails.
+ */
+ private void validateTaskLimits(ITaskConfig task, int instances)
throws ScheduleException {
- int instanceCount = count;
- if (incremental) {
- instanceCount +=
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.jobScoped(jobKey).active()).size();
- }
-
// TODO(maximk): This is a short-term hack to stop the bleeding from
// https://issues.apache.org/jira/browse/MESOS-691
- if (taskIdGenerator.generate(task, instanceCount).length() > MAX_TASK_ID_LENGTH) {
+ if (taskIdGenerator.generate(task, instances).length() > MAX_TASK_ID_LENGTH) {
throw new ScheduleException(
"Task ID is too long, please shorten your role or job name.");
}
- JobFilter.JobFilterResult filterResult = jobFilter.filter(task, instanceCount);
- // TODO(maximk): Consider deprecating JobFilterResult in favor of custom exception.
- if (!filterResult.isPass()) {
- throw new ScheduleException(filterResult.getReason());
+ if (instances > MAX_TASKS_PER_JOB.get()) {
+ throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
}
- if (instanceCount > MAX_TASKS_PER_JOB.get()) {
- throw new ScheduleException("Job exceeds task limit of " + MAX_TASKS_PER_JOB.get());
+ QuotaCheckResult quotaCheck = quotaManager.checkQuota(task, instances);
+ if (quotaCheck.getResult() == INSUFFICIENT_QUOTA) {
+ throw new ScheduleException("Insufficient resource quota: " + quotaCheck.getDetails());
}
}
@@ -188,7 +200,7 @@ class SchedulerCoreImpl implements SchedulerCore {
throws ScheduleException {
IJobConfiguration job = sanitizedConfiguration.getJobConfig();
- runJobFilters(job.getKey(), job.getTaskConfig(), job.getInstanceCount(), false);
+ validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
}
@Override
@@ -197,12 +209,12 @@ class SchedulerCoreImpl implements SchedulerCore {
final ImmutableSet<Integer> instanceIds,
final ITaskConfig config) throws ScheduleException {
- runJobFilters(jobKey, config, instanceIds.size(), true);
storage.write(new MutateWork.NoResult<ScheduleException>() {
- @Override
- protected void execute(MutableStoreProvider storeProvider)
+ @Override protected void execute(MutableStoreProvider storeProvider)
throws ScheduleException {
+ validateTaskLimits(config, instanceIds.size());
+
ImmutableSet<IScheduledTask> tasks =
storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 79f5605..53d0c85 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -20,13 +20,11 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.inject.BindingAnnotation;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
/**
@@ -302,21 +300,5 @@ public interface Storage {
}
});
}
-
- /**
- * Fetch quota for {@code role} from {@code storage} in a consistent read operation.
- *
- * @param storage Storage instance to fetch quota from.
- * @param role Role to fetch quota for.
- * @return Quota returned from the fetch operation.
- * @see QuotaStore#fetchQuota(String)
- */
- public static Optional<IQuota> consistentFetchQuota(Storage storage, final String role) {
- return storage.consistentRead(new Work.Quiet<Optional<IQuota>>() {
- @Override public Optional<IQuota> apply(StoreProvider storeProvider) {
- return storeProvider.getQuotaStore().fetchQuota(role);
- }
- });
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index 8fb51d6..6f71fe2 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -15,6 +15,7 @@
*/
package org.apache.aurora.scheduler.storage.testing;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -30,6 +31,7 @@ import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IQuota;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.easymock.Capture;
import org.easymock.IAnswer;
@@ -131,4 +133,8 @@ public class StorageTestUtil {
public IExpectationSetters<?> expectTaskFetch(Query.Builder query, IScheduledTask... result) {
return expectTaskFetch(query, ImmutableSet.<IScheduledTask>builder().add(result).build());
}
+
+ public IExpectationSetters<?> expectQuotaFetch(String role, Optional<IQuota> result) {
+ return expect(quotaStore.fetchQuota(role)).andReturn(result);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 76caa62..cf9099f 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -96,7 +96,9 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.quota.Quotas;
+import org.apache.aurora.scheduler.quota.QuotaInfo;
+import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
import org.apache.aurora.scheduler.state.CronJobManager;
import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.state.LockManager.LockException;
@@ -106,8 +108,6 @@ import org.apache.aurora.scheduler.storage.JobStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.backup.Recovery;
import org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryException;
import org.apache.aurora.scheduler.storage.backup.StorageBackup;
@@ -170,6 +170,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
private final Recovery recovery;
private final MaintenanceController maintenance;
private final CronJobManager cronJobManager;
+ private final QuotaManager quotaManager;
private final Amount<Long, Time> killTaskInitialBackoff;
private final Amount<Long, Time> killTaskMaxBackoff;
@@ -182,7 +183,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
StorageBackup backup,
Recovery recovery,
CronJobManager cronJobManager,
- MaintenanceController maintenance) {
+ MaintenanceController maintenance,
+ QuotaManager quotaManager) {
this(storage,
schedulerCore,
@@ -192,6 +194,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
recovery,
maintenance,
cronJobManager,
+ quotaManager,
KILL_TASK_INITIAL_BACKOFF.get(),
KILL_TASK_MAX_BACKOFF.get());
}
@@ -206,6 +209,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
Recovery recovery,
MaintenanceController maintenance,
CronJobManager cronJobManager,
+ QuotaManager quotaManager,
Amount<Long, Time> initialBackoff,
Amount<Long, Time> maxBackoff) {
@@ -217,6 +221,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
this.recovery = checkNotNull(recovery);
this.maintenance = checkNotNull(maintenance);
this.cronJobManager = checkNotNull(cronJobManager);
+ this.quotaManager = checkNotNull(quotaManager);
this.killTaskInitialBackoff = checkNotNull(initialBackoff);
this.killTaskMaxBackoff = checkNotNull(maxBackoff);
}
@@ -308,7 +313,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
SanitizedConfiguration sanitized =
SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(description));
- // TODO(maximk): Consider moving job validation logic into a dedicated RPC. MESOS-4476.
+ // TODO(maximk): Drop it once migration to client quota checks is completed.
if (validation != null && validation == JobConfigValidation.RUN_FILTERS) {
schedulerCore.validateJobResources(sanitized);
}
@@ -590,16 +595,28 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
public Response getQuota(final String ownerRole) {
checkNotBlank(ownerRole);
- IQuota quota = storage.consistentRead(new Work.Quiet<IQuota>() {
- @Override public IQuota apply(StoreProvider storeProvider) {
- return storeProvider.getQuotaStore().fetchQuota(ownerRole).or(Quotas.noQuota());
- }
- });
+ QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ownerRole);
+ GetQuotaResult result = new GetQuotaResult(quotaInfo.guota().newBuilder())
+ .setConsumed(quotaInfo.prodConsumption().newBuilder());
- return new Response()
- .setResponseCode(OK)
- .setResult(Result.getQuotaResult(new GetQuotaResult()
- .setQuota(quota.newBuilder())));
+ return new Response().setResponseCode(OK).setResult(Result.getQuotaResult(result));
+ }
+
+
+ @Requires(whitelist = Capability.PROVISIONER)
+ @Override
+ public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) {
+ checkNotBlank(ownerRole);
+ checkNotNull(quota);
+ checkNotNull(session);
+
+ Response response = new Response();
+ try {
+ quotaManager.saveQuota(ownerRole, IQuota.build(quota));
+ return response.setResponseCode(OK).setMessage("Quota applied.");
+ } catch (QuotaException e) {
+ return response.setResponseCode(ResponseCode.INVALID_REQUEST).setMessage(e.getMessage());
+ }
}
@Override
@@ -634,24 +651,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
.setStatuses(maintenance.endMaintenance(hosts.getHostNames()))));
}
- @Requires(whitelist = Capability.PROVISIONER)
- @Override
- public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) {
- checkNotBlank(ownerRole);
- checkNotNull(quota);
- checkNotNull(session);
-
- // TODO(Kevin Sweeney): Input validation for Quota.
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getQuotaStore().saveQuota(ownerRole, IQuota.build(quota));
- }
- });
-
- return new Response().setResponseCode(OK).setMessage("Quota applied.");
- }
-
@Override
public Response forceTaskState(
String taskId,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
new file mode 100644
index 0000000..e366026
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.quota;
+
+import org.apache.aurora.gen.Quota;
+import org.apache.aurora.scheduler.storage.entities.IQuota;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class QuotaCheckResultTest {
+
+ @Test
+ public void testGreaterOrEqualPass() {
+ IQuota quota = IQuota.build(new Quota()
+ .setNumCpus(1.0)
+ .setRamMb(256L)
+ .setDiskMb(512L));
+ IQuota request = IQuota.build(new Quota()
+ .setNumCpus(1.0)
+ .setRamMb(256L)
+ .setDiskMb(512L));
+ assertEquals(SUFFICIENT_QUOTA, QuotaCheckResult.greaterOrEqual(quota, request).getResult());
+ }
+
+ @Test
+ public void testGreaterOrEqualFailsCpu() {
+ IQuota quota = IQuota.build(new Quota()
+ .setNumCpus(1.0)
+ .setRamMb(256L)
+ .setDiskMb(512L));
+ IQuota request = IQuota.build(new Quota()
+ .setNumCpus(2.0)
+ .setRamMb(256L)
+ .setDiskMb(512L));
+ QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+ assertEquals(INSUFFICIENT_QUOTA, result.getResult());
+ assertTrue(result.getDetails().get().contains("CPU"));
+ }
+
+ @Test
+ public void testGreaterOrEqualFailsRam() {
+ IQuota quota = IQuota.build(new Quota()
+ .setNumCpus(1.0)
+ .setRamMb(256L)
+ .setDiskMb(512L));
+ IQuota request = IQuota.build(new Quota()
+ .setNumCpus(1.0)
+ .setRamMb(512L)
+ .setDiskMb(512L));
+ QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+ assertEquals(INSUFFICIENT_QUOTA, result.getResult());
+ assertTrue(result.getDetails().get().length() > 0);
+ assertTrue(result.getDetails().get().contains("RAM"));
+ }
+
+ @Test
+ public void testGreaterOrEqualFailsDisk() {
+ IQuota quota = IQuota.build(new Quota()
+ .setNumCpus(1.0)
+ .setRamMb(256L)
+ .setDiskMb(512L));
+ IQuota request = IQuota.build(new Quota()
+ .setNumCpus(1.0)
+ .setRamMb(256L)
+ .setDiskMb(1024L));
+ QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request);
+ assertEquals(INSUFFICIENT_QUOTA, result.getResult());
+ assertTrue(result.getDetails().get().length() > 0);
+ assertTrue(result.getDetails().get().contains("DISK"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/quota/QuotaComparisonResultTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaComparisonResultTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaComparisonResultTest.java
deleted file mode 100644
index 23069b8..0000000
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaComparisonResultTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import org.apache.aurora.gen.Quota;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
-import org.junit.Test;
-
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.SUFFICIENT_QUOTA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class QuotaComparisonResultTest {
-
- @Test
- public void testGreaterOrEqualPass() {
- IQuota quota = IQuota.build(new Quota()
- .setNumCpus(1.0)
- .setRamMb(256L)
- .setDiskMb(512L));
- IQuota request = IQuota.build(new Quota()
- .setNumCpus(1.0)
- .setRamMb(256L)
- .setDiskMb(512L));
- assertEquals(SUFFICIENT_QUOTA, QuotaComparisonResult.greaterOrEqual(quota, request).result());
- }
-
- @Test
- public void testGreaterOrEqualFailsCpu() {
- IQuota quota = IQuota.build(new Quota()
- .setNumCpus(1.0)
- .setRamMb(256L)
- .setDiskMb(512L));
- IQuota request = IQuota.build(new Quota()
- .setNumCpus(2.0)
- .setRamMb(256L)
- .setDiskMb(512L));
- QuotaComparisonResult result = QuotaComparisonResult.greaterOrEqual(quota, request);
- assertEquals(INSUFFICIENT_QUOTA, result.result());
- assertTrue(result.details().contains("CPU"));
- }
-
- @Test
- public void testGreaterOrEqualFailsRam() {
- IQuota quota = IQuota.build(new Quota()
- .setNumCpus(1.0)
- .setRamMb(256L)
- .setDiskMb(512L));
- IQuota request = IQuota.build(new Quota()
- .setNumCpus(1.0)
- .setRamMb(512L)
- .setDiskMb(512L));
- QuotaComparisonResult result = QuotaComparisonResult.greaterOrEqual(quota, request);
- assertEquals(INSUFFICIENT_QUOTA, result.result());
- assertTrue(result.details().length() > 0);
- assertTrue(result.details().contains("RAM"));
- }
-
- @Test
- public void testGreaterOrEqualFailsDisk() {
- IQuota quota = IQuota.build(new Quota()
- .setNumCpus(1.0)
- .setRamMb(256L)
- .setDiskMb(512L));
- IQuota request = IQuota.build(new Quota()
- .setNumCpus(1.0)
- .setRamMb(256L)
- .setDiskMb(1024L));
- QuotaComparisonResult result = QuotaComparisonResult.greaterOrEqual(quota, request);
- assertEquals(INSUFFICIENT_QUOTA, result.result());
- assertTrue(result.details().length() > 0);
- assertTrue(result.details().contains("DISK"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/quota/QuotaFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaFilterTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaFilterTest.java
deleted file mode 100644
index b1d878e..0000000
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaFilterTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.Quota;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IQuota;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.SUFFICIENT_QUOTA;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class QuotaFilterTest extends EasyMockTest {
- private static final int DEFAULT_TASKS_IN_QUOTA = 10;
- private static final String ROLE = "test";
- private static final String JOB_NAME = "test_job";
- private static final String ENV = "test_env";
- private static final IJobKey JOB_KEY = JobKeys.from(ROLE, ENV, JOB_NAME);
- private static final Query.Builder QUERY = Query.jobScoped(JOB_KEY).active();
- private static final IQuota QUOTA = IQuota.build(new Quota()
- .setNumCpus(1.0)
- .setRamMb(256L)
- .setDiskMb(512L));
- private static final ITaskConfig TASK_CONFIG = ITaskConfig.build(new TaskConfig()
- .setNumCpus(QUOTA.getNumCpus())
- .setRamMb(QUOTA.getRamMb())
- .setDiskMb(QUOTA.getDiskMb()));
- private static final IJobConfiguration JOB = IJobConfiguration.build(new JobConfiguration()
- .setKey(JOB_KEY.newBuilder())
- .setInstanceCount(1)
- .setTaskConfig(TASK_CONFIG.newBuilder()));
-
- private QuotaFilter quotaFilter;
-
- private QuotaManagerImpl quotaManager;
- private StorageTestUtil storageTestUtil;
- private QuotaComparisonResult quotaCompResult;
-
- @Before
- public void setUp() {
- quotaManager = createMock(QuotaManagerImpl.class);
- quotaCompResult = createMock(QuotaComparisonResult.class);
- storageTestUtil = new StorageTestUtil(this);
-
- quotaFilter = new QuotaFilter(quotaManager, storageTestUtil.storage);
- }
-
- @Test
- public void testNonProductionPasses() {
- JobConfiguration jobBuilder = JOB.newBuilder();
- jobBuilder.getTaskConfig().setProduction(false);
- IJobConfiguration job = IJobConfiguration.build(jobBuilder);
-
- control.replay();
-
- assertTrue(quotaFilter.filter(job).isPass());
- }
-
- @Test
- public void testCreateProductionJobChecksQuota() {
- JobConfiguration jobBuilder = JOB.newBuilder();
- jobBuilder.getTaskConfig().setProduction(true);
- IJobConfiguration job = IJobConfiguration.build(jobBuilder);
-
- storageTestUtil.expectOperations();
- storageTestUtil.expectTaskFetch(QUERY).times(2);
-
- expect(quotaManager.checkQuota(ROLE, QUOTA)).andReturn(quotaCompResult);
- expect(quotaCompResult.result()).andReturn(SUFFICIENT_QUOTA);
- expect(quotaManager.checkQuota(ROLE, QUOTA)).andReturn(quotaCompResult);
- expect(quotaCompResult.result()).andReturn(INSUFFICIENT_QUOTA);
- expect(quotaCompResult.details()).andReturn("Details");
-
- control.replay();
-
- assertTrue(quotaFilter.filter(job).isPass());
- assertFalse(quotaFilter.filter(job).isPass());
- }
-
- @Test
- public void testUpdateProductionJobChecksQuota() {
- JobConfiguration jobBuilder = JOB.newBuilder();
- jobBuilder.getTaskConfig().setProduction(true);
-
- storageTestUtil.expectOperations();
- storageTestUtil.expectTaskFetch(QUERY,
- IScheduledTask.build(new ScheduledTask().setAssignedTask(
- new AssignedTask().setTask(jobBuilder.getTaskConfig()))));
-
- expect(quotaManager.checkQuota(ROLE, IQuota.build(new Quota(0, 0, 0))))
- .andReturn(quotaCompResult);
- expect(quotaCompResult.result()).andReturn(SUFFICIENT_QUOTA);
-
- control.replay();
-
- assertTrue(quotaFilter.filter(IJobConfiguration.build(jobBuilder)).isPass());
- }
-
- @Test
- public void testIncreaseShardsExceedsQuota() {
- int numTasks = DEFAULT_TASKS_IN_QUOTA;
- int additionalTasks = 1;
-
- JobConfiguration jobBuilder = JOB.newBuilder().setInstanceCount(numTasks + additionalTasks);
- jobBuilder.getTaskConfig().setProduction(true);
-
- IScheduledTask[] scheduledTasks = new IScheduledTask[numTasks];
- for (int i = 0; i < numTasks; i++) {
- ScheduledTask builder = new ScheduledTask().setAssignedTask(
- new AssignedTask().setTask(jobBuilder.getTaskConfig()));
- builder.getAssignedTask().setInstanceId(i);
- scheduledTasks[i] = IScheduledTask.build(builder);
- }
-
- storageTestUtil.expectOperations();
- storageTestUtil.expectTaskFetch(QUERY, scheduledTasks);
-
- expect(quotaManager.checkQuota(ROLE, QUOTA)).andReturn(quotaCompResult);
- expect(quotaCompResult.result()).andReturn(INSUFFICIENT_QUOTA);
- expect(quotaCompResult.details()).andReturn("Details");
-
- control.replay();
-
- assertFalse(quotaFilter.filter(IJobConfiguration.build(jobBuilder)).isPass());
- }
-
- @Test
- public void testUpdateProductionTasksChecksQuota() {
- JobConfiguration jobBuilder = JOB.newBuilder();
- TaskConfig config = jobBuilder.getTaskConfig()
- .setProduction(true)
- .setOwner(new Identity(ROLE, "user"))
- .setEnvironment(ENV)
- .setJobName(JOB_NAME);
-
- storageTestUtil.expectOperations();
- storageTestUtil.expectTaskFetch(QUERY,
- IScheduledTask.build(new ScheduledTask().setAssignedTask(
- new AssignedTask().setTask(jobBuilder.getTaskConfig()))));
-
- expect(quotaManager.checkQuota(ROLE, IQuota.build(new Quota(0, 0, 0))))
- .andReturn(quotaCompResult);
- expect(quotaCompResult.result()).andReturn(SUFFICIENT_QUOTA);
-
- control.replay();
-
- assertTrue(quotaFilter.filter(ITaskConfig.build(config), 1).isPass());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testUpdateProductionTasksFailsJobKeyCreation() {
- JobConfiguration jobBuilder = JOB.newBuilder();
- jobBuilder.getTaskConfig().setOwner(new Identity(ROLE, "user"));
- control.replay();
-
- quotaFilter.filter(ITaskConfig.build(jobBuilder.getTaskConfig()), 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index f971aa1..82e17df 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -25,26 +25,32 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
import org.apache.aurora.scheduler.storage.entities.IQuota;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.INSUFFICIENT_QUOTA;
-import static org.apache.aurora.scheduler.quota.QuotaComparisonResult.Result.SUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class QuotaManagerImplTest extends EasyMockTest {
- private static final String ROLE = "foo";
+ private static final String ROLE = "test";
+ private static final String ENV = "test_env";
+ private static final IQuota QUOTA = IQuota.build(new Quota()
+ .setNumCpus(1.0)
+ .setRamMb(100L)
+ .setDiskMb(200L));
private static final Query.Builder ACTIVE_QUERY = Query.roleScoped(ROLE).active();
private StorageTestUtil storageUtil;
- // TODO(maximk): Move checkQuota to QuotaFilter along with tests.
private QuotaManagerImpl quotaManager;
@Before
@@ -54,125 +60,172 @@ public class QuotaManagerImplTest extends EasyMockTest {
}
@Test
- public void testGetEmptyQuota() {
+ public void testGetQuotaInfo() {
+ IScheduledTask prodTask = createTask("foo", "id1", 3, 3, 3, true);
+ IScheduledTask nonProdTask = createTask("bar", "id1", 2, 2, 2, false);
+ IQuota quota = IQuota.build(new Quota(4, 4, 4));
+
+ expectQuota(quota);
+ expectTasks(prodTask, nonProdTask);
storageUtil.expectOperations();
- returnNoTasks();
control.replay();
- assertEquals(Quotas.noQuota(), quotaManager.getConsumption(ROLE));
+ QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
+ assertEquals(quota, quotaInfo.guota());
+ assertEquals(IQuota.build(new Quota(3, 3, 3)), quotaInfo.prodConsumption());
+ assertEquals(IQuota.build(new Quota(2, 2, 2)), quotaInfo.nonProdConsumption());
}
@Test
- public void testConsumeNoQuota() {
+ public void testGetQuotaInfoNoTasks() {
+ IQuota quota = IQuota.build(new Quota(4, 4, 4));
+
+ expectQuota(quota);
+ expectNoTasks();
storageUtil.expectOperations();
- applyQuota(new Quota(1, 1, 1));
- returnNoTasks();
control.replay();
- assertEquals(SUFFICIENT_QUOTA, quotaManager.checkQuota(ROLE, Quotas.noQuota()).result());
+ QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ROLE);
+ assertEquals(quota, quotaInfo.guota());
+ assertEquals(Quotas.noQuota(), quotaInfo.prodConsumption());
+ assertEquals(Quotas.noQuota(), quotaInfo.nonProdConsumption());
}
@Test
- public void testNoQuotaExhausted() {
+ public void testCheckQuotaPasses() {
+ expectQuota(IQuota.build(new Quota(4, 4, 4)));
+ expectTasks(createTask("foo", "id1", 3, 3, 3, true));
storageUtil.expectOperations();
- returnNoTasks();
- expect(storageUtil.quotaStore.fetchQuota(ROLE)).andReturn(Optional.<IQuota>absent());
control.replay();
- QuotaComparisonResult result =
- quotaManager.checkQuota(ROLE, IQuota.build(new Quota(1, 1, 1)));
-
- assertEquals(INSUFFICIENT_QUOTA, result.result());
- assertTrue(result.details().length() > 0);
+ QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, true), 1);
+ assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
}
@Test
- public void testUseAllQuota() {
- IScheduledTask task1 = createTask("foo", "id1", 1, 1, 1);
- IScheduledTask task2 = createTask("foo", "id2", 1, 1, 1);
-
+ public void testCheckQuotaPassesNoTasks() {
+ expectQuota(IQuota.build(new Quota(4, 4, 4)));
+ expectNoTasks();
storageUtil.expectOperations();
- applyQuota(new Quota(2, 2, 2)).anyTimes();
- returnTasks(task1);
- returnTasks(task1, task2);
control.replay();
- IQuota half = IQuota.build(new Quota(1, 1, 1));
- assertEquals(SUFFICIENT_QUOTA, quotaManager.checkQuota(ROLE, half).result());
- assertEquals(INSUFFICIENT_QUOTA, quotaManager.checkQuota(ROLE, half).result());
+ QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, true), 1);
+ assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
}
@Test
- public void testExhaustCpu() {
+ public void testCheckQuotaPassesNonProdUnaccounted() {
+ expectQuota(IQuota.build(new Quota(4, 4, 4)));
+ expectTasks(createTask("foo", "id1", 3, 3, 3, true), createTask("bar", "id2", 5, 5, 5, false));
storageUtil.expectOperations();
- applyQuota(new Quota(2, 2, 2));
- returnTasks(createTask("foo", "id1", 1, 1, 1));
control.replay();
- assertEquals(
- INSUFFICIENT_QUOTA,
- quotaManager.checkQuota(ROLE, IQuota.build(new Quota(2, 1, 1))).result());
+ QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, true), 1);
+ assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
}
@Test
- public void testExhaustRam() {
+ public void testCheckQuotaSkippedForNonProdRequest() {
+ control.replay();
+
+ QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, false), 1);
+ assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+ }
+
+ @Test
+ public void testCheckQuotaNoQuotaSet() {
+ expect(storageUtil.quotaStore.fetchQuota(ROLE)).andReturn(Optional.<IQuota>absent());
+ expectNoTasks();
storageUtil.expectOperations();
- applyQuota(new Quota(2, 2, 2));
- returnTasks(createTask("foo", "id1", 1, 1, 1));
control.replay();
+ QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 1, true), 1);
+ assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+ }
+
+ @Test
+ public void testCheckQuotaExceedsCpu() {
+ expectQuota(IQuota.build(new Quota(4, 4, 4)));
+ expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+ storageUtil.expectOperations();
- assertEquals(
- INSUFFICIENT_QUOTA,
- quotaManager.checkQuota(ROLE, IQuota.build(new Quota(1, 2, 1))).result());
+ control.replay();
+ QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(2, 1, 1, true), 1);
+ assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+ assertTrue(checkQuota.getDetails().get().contains("CPU"));
}
@Test
- public void testExhaustDisk() {
+ public void testCheckQuotaExceedsRam() {
+ expectQuota(IQuota.build(new Quota(4, 4, 4)));
+ expectTasks(createTask("foo", "id1", 3, 3, 3, true));
storageUtil.expectOperations();
- applyQuota(new Quota(2, 2, 2));
- returnTasks(createTask("foo", "id1", 1, 1, 1));
control.replay();
+ QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 2, 1, true), 1);
+ assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+ assertTrue(checkQuota.getDetails().get().contains("RAM"));
+ }
- assertEquals(
- INSUFFICIENT_QUOTA,
- quotaManager.checkQuota(ROLE, IQuota.build(new Quota(1, 1, 2))).result());
+ @Test
+ public void testCheckQuotaExceedsDisk() {
+ expectQuota(IQuota.build(new Quota(4, 4, 4)));
+ expectTasks(createTask("foo", "id1", 3, 3, 3, true));
+ storageUtil.expectOperations();
+
+ control.replay();
+ QuotaCheckResult checkQuota = quotaManager.checkQuota(createTaskConfig(1, 1, 2, true), 1);
+ assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+ assertTrue(checkQuota.getDetails().get().contains("DISK"));
}
@Test
- public void testNonproductionUnaccounted() {
- ScheduledTask builder = createTask("foo", "id1", 3, 3, 3).newBuilder();
- builder.getAssignedTask().getTask().setProduction(false);
- IScheduledTask task = IScheduledTask.build(builder);
+ public void testSaveQuotaPasses() throws Exception {
+ storageUtil.quotaStore.saveQuota(ROLE, QUOTA);
+ storageUtil.expectOperations();
+
+ control.replay();
+ quotaManager.saveQuota(ROLE, QUOTA);
+ }
+ @Test(expected = QuotaException.class)
+ public void testSaveQuotaFailsMissingSpecs() throws Exception {
storageUtil.expectOperations();
- applyQuota(new Quota(2, 2, 2));
- returnTasks(task);
control.replay();
+ quotaManager.saveQuota(ROLE, IQuota.build(new Quota()));
+ }
+
+ @Test(expected = QuotaException.class)
+ public void testSaveQuotaFailsNegativeValues() throws Exception {
+ storageUtil.expectOperations();
- assertEquals(
- SUFFICIENT_QUOTA,
- quotaManager.checkQuota(ROLE, IQuota.build(new Quota(2, 2, 2))).result());
+ control.replay();
+ quotaManager.saveQuota(ROLE, IQuota.build(new Quota(-2.0, 4, 5)));
}
- private IExpectationSetters<?> returnTasks(IScheduledTask... tasks) {
+ private IExpectationSetters<?> expectTasks(IScheduledTask... tasks) {
return storageUtil.expectTaskFetch(ACTIVE_QUERY, tasks);
}
- private IExpectationSetters<?> returnNoTasks() {
- return returnTasks();
+ private IExpectationSetters<?> expectNoTasks() {
+ return expectTasks();
}
- private IExpectationSetters<Optional<IQuota>> applyQuota(Quota quota) {
+ private IExpectationSetters<Optional<IQuota>> expectQuota(IQuota quota) {
return expect(storageUtil.quotaStore.fetchQuota(ROLE))
- .andReturn(Optional.of(IQuota.build(quota)));
+ .andReturn(Optional.of(quota));
+ }
+
+ private ITaskConfig createTaskConfig(int cpus, int ramMb, int diskMb, boolean production) {
+ return createTask("newTask", "newId", cpus, ramMb, diskMb, production)
+ .getAssignedTask()
+ .getTask();
}
private IScheduledTask createTask(
@@ -180,23 +233,21 @@ public class QuotaManagerImplTest extends EasyMockTest {
String taskId,
int cpus,
int ramMb,
- int diskMb) {
+ int diskMb,
+ boolean production) {
return IScheduledTask.build(new ScheduledTask()
.setStatus(ScheduleStatus.RUNNING)
.setAssignedTask(
new AssignedTask()
.setTaskId(taskId)
- .setTask(createTaskConfig(jobName, cpus, ramMb, diskMb))));
- }
-
- private TaskConfig createTaskConfig(String jobName, int cpus, int ramMb, int diskMb) {
- return new TaskConfig()
- .setOwner(new Identity(ROLE, ROLE))
- .setJobName(jobName)
- .setNumCpus(cpus)
- .setRamMb(ramMb)
- .setDiskMb(diskMb)
- .setProduction(true);
+ .setTask(new TaskConfig()
+ .setOwner(new Identity(ROLE, ROLE))
+ .setEnvironment(ENV)
+ .setJobName(jobName)
+ .setNumCpus(cpus)
+ .setRamMb(ramMb)
+ .setDiskMb(diskMb)
+ .setProduction(production))));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e2b357e7/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 4eeed38..9291fe6 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -70,7 +70,8 @@ import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
import org.apache.aurora.scheduler.cron.CronScheduler;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.JobFilter.JobFilterResult;
+import org.apache.aurora.scheduler.quota.QuotaCheckResult;
+import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -100,6 +101,8 @@ import static org.apache.aurora.gen.ScheduleStatus.STARTING;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.hostLimitConstraint;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.validateAndPopulate;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
@@ -130,6 +133,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("SlaveId").build();
private static final String SLAVE_HOST_1 = "SlaveHost1";
+ private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
+ private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
+
private Driver driver;
private StateManagerImpl stateManager;
private Storage storage;
@@ -140,7 +146,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private EventSink eventSink;
private RescheduleCalculator rescheduleCalculator;
private ShutdownRegistry shutdownRegistry;
- private JobFilter jobFilter;
+ private QuotaManager quotaManager;
// TODO(William Farner): Set up explicit expectations for calls to generate task IDs.
private final AtomicLong idCounter = new AtomicLong();
@@ -155,19 +161,19 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
driver = createMock(Driver.class);
clock = new FakeClock();
eventSink = createMock(EventSink.class);
- eventSink.post(EasyMock.<PubsubEvent>anyObject());
rescheduleCalculator = createMock(RescheduleCalculator.class);
cronScheduler = createMock(CronScheduler.class);
shutdownRegistry = createMock(ShutdownRegistry.class);
- jobFilter = createMock(JobFilter.class);
- expectLastCall().anyTimes();
+ quotaManager = createMock(QuotaManager.class);
+ eventSink.post(EasyMock.<PubsubEvent>anyObject());
+ expectLastCall().anyTimes();
expect(cronScheduler.schedule(anyObject(String.class), anyObject(Runnable.class)))
.andStubReturn("key");
expect(cronScheduler.isValidSchedule(anyObject(String.class))).andStubReturn(true);
- expect(jobFilter.filter(anyObject(ITaskConfig.class), anyInt())).andStubReturn(
- JobFilterResult.pass());
+ expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
+ .andStubReturn(ENOUGH_QUOTA);
}
/**
@@ -187,8 +193,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private void buildScheduler(Storage newStorage) throws Exception {
this.storage = newStorage;
storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
StorageBackfill.backfill(storeProvider, clock);
}
});
@@ -208,7 +213,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
immediateManager,
stateManager,
taskIdGenerator,
- jobFilter);
+ quotaManager);
cron.schedulerCore = scheduler;
immediateManager.schedulerCore = scheduler;
}
@@ -1164,8 +1169,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testEnsureCanAddInstances() throws Exception {
SanitizedConfiguration job = makeJob(KEY_A, 1);
- expect(jobFilter.filter(job.getJobConfig().getTaskConfig(), 1))
- .andReturn(JobFilterResult.pass());
control.replay();
buildScheduler();
@@ -1176,8 +1179,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test(expected = ScheduleException.class)
public void testEnsureCanAddInstancesFails() throws Exception {
SanitizedConfiguration job = makeJob(KEY_A, 1);
- expect(jobFilter.filter(job.getJobConfig().getTaskConfig(), 1))
- .andReturn(JobFilterResult.fail("fail"));
+ expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
+ .andReturn(NOT_ENOUGH_QUOTA);
control.replay();
buildScheduler();
@@ -1216,8 +1219,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test(expected = ScheduleException.class)
public void testFilterFailRejectsCreate() throws Exception {
SanitizedConfiguration job = makeJob(KEY_A, 1);
- expect(jobFilter.filter(job.getJobConfig().getTaskConfig(), 1))
- .andReturn(JobFilterResult.fail("failed"));
+ expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
+ .andReturn(NOT_ENOUGH_QUOTA);
control.replay();
@@ -1228,7 +1231,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test(expected = ScheduleException.class)
public void testFilterFailRejectsAddInstances() throws Exception {
IJobConfiguration job = makeJob(KEY_A, 1).getJobConfig();
- expect(jobFilter.filter(job.getTaskConfig(), 1)).andReturn(JobFilterResult.fail("failed"));
+ expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
+ .andReturn(NOT_ENOUGH_QUOTA);
control.replay();
@@ -1258,7 +1262,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
.setEnvironment(ENV_A)
.setJobName(KEY_A.getName())
.setOwner(OWNER_A);
- expect(jobFilter.filter(ITaskConfig.build(newTask), 2)).andReturn(JobFilterResult.pass());
ImmutableSet<Integer> instances = ImmutableSet.of(1);
control.replay();