You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/01/08 02:05:34 UTC
incubator-aurora git commit: Removing cron schedule support from
createJob and killTasks
Repository: incubator-aurora
Updated Branches:
refs/heads/master 7449e34d8 -> fb4d3f9ac
Removing cron schedule support from createJob and killTasks
Bugs closed: AURORA-454, AURORA-976
Reviewed at https://reviews.apache.org/r/29271/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/fb4d3f9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/fb4d3f9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/fb4d3f9a
Branch: refs/heads/master
Commit: fb4d3f9ac09eda54fd5259ef8b84669fa8cff864
Parents: 7449e34
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed Jan 7 17:01:36 2015 -0800
Committer: -l <ma...@apache.org>
Committed: Wed Jan 7 17:01:36 2015 -0800
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/base/Query.java | 17 ---
.../thrift/SchedulerThriftInterface.java | 138 ++++++++++---------
.../thrift/SchedulerThriftInterfaceTest.java | 85 +++++-------
3 files changed, 109 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/main/java/org/apache/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java
index a6ff14a..458530f 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Query.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java
@@ -15,12 +15,9 @@ package org.apache.aurora.scheduler.base;
import java.util.EnumSet;
import java.util.Objects;
-import java.util.Set;
-import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import org.apache.aurora.gen.ScheduleStatus;
@@ -56,20 +53,6 @@ public final class Query {
return q.isSetRole() && q.isSetEnvironment() && q.isSetJobName() || q.isSetJobKeys();
}
- /**
- * Checks whether a query is strictly scoped to a specific job. A query is strictly job scoped,
- * iff the only fields that are set in the query are: role, environment and job name.
- *
- * @param query Query to test.
- * @return {@code true} if the query is strictly single job scoped, otherwise {@code false}.
- */
- public static boolean isSingleJobScoped(Builder query) {
- Optional<Set<IJobKey>> jobKey = JobKeys.from(query);
- return jobKey.isPresent()
- && jobKey.get().size() == 1
- && Query.jobScoped(Iterables.getOnlyElement(jobKey.get())).equals(query);
- }
-
public static Builder arbitrary(TaskQuery query) {
return new Builder(query.deepCopy());
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 6e3caa1..ac92959 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -286,6 +286,10 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
return errorResponse(INVALID_REQUEST, e);
}
+ if (sanitized.isCron()) {
+ return invalidResponse(NO_CRON);
+ }
+
return storage.write(new MutateWork.Quiet<Response>() {
@Override
public Response apply(MutableStoreProvider storeProvider) {
@@ -296,84 +300,97 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
ILockKey.build(LockKey.job(job.getKey().newBuilder())),
Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
- if (!storeProvider.getTaskStore().fetchTasks(
- Query.jobScoped(job.getKey()).active()).isEmpty()
- || cronJobManager.hasJob(job.getKey())) {
-
- return invalidResponse("Job already exists: " + JobKeys.canonicalString(job.getKey()));
- }
+ checkJobExists(storeProvider, job.getKey());
ITaskConfig template = sanitized.getJobConfig().getTaskConfig();
int count = sanitized.getJobConfig().getInstanceCount();
validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count));
- // TODO(mchucarroll): deprecate cron as a part of create/kill job.(AURORA-454)
- if (sanitized.isCron()) {
- LOG.warning("Deprecated behavior: scheduling job " + job.getKey()
- + " with cron via createJob (AURORA_454)");
- cronJobManager.createJob(SanitizedCronJob.from(sanitized));
- } else {
- LOG.info("Launching " + count + " tasks.");
- stateManager.insertPendingTasks(
- storeProvider,
- template,
- sanitized.getInstanceIds());
- }
+ LOG.info("Launching " + count + " tasks.");
+ stateManager.insertPendingTasks(
+ storeProvider,
+ template,
+ sanitized.getInstanceIds());
+
return okEmptyResponse();
} catch (LockException e) {
return errorResponse(LOCK_ERROR, e);
- } catch (CronException | TaskValidationException e) {
+ } catch (JobExistsException | TaskValidationException e) {
return errorResponse(INVALID_REQUEST, e);
}
}
});
}
+ private static class JobExistsException extends Exception {
+ public JobExistsException(String message) {
+ super(message);
+ }
+ }
+
+ private void checkJobExists(StoreProvider store, IJobKey jobKey) throws JobExistsException {
+ if (!store.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()).isEmpty()
+ || cronJobManager.hasJob(jobKey)) {
+
+ throw new JobExistsException(jobAlreadyExistsMessage(jobKey));
+ }
+ }
+
private Response createOrUpdateCronTemplate(
JobConfiguration mutableJob,
- @Nullable Lock mutableLock,
+ @Nullable final Lock mutableLock,
SessionKey session,
- boolean updateOnly) {
+ final boolean updateOnly) {
IJobConfiguration job = IJobConfiguration.build(mutableJob);
- IJobKey jobKey = JobKeys.assertValid(job.getKey());
+ final IJobKey jobKey = JobKeys.assertValid(job.getKey());
requireNonNull(session);
+ final SanitizedConfiguration sanitized;
try {
sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+ sanitized = SanitizedConfiguration.fromUnsanitized(job);
+ } catch (AuthFailedException e) {
+ return errorResponse(AUTH_FAILED, e);
+ } catch (TaskDescriptionException e) {
+ return errorResponse(INVALID_REQUEST, e);
+ }
- SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
+ if (!sanitized.isCron()) {
+ return invalidResponse(noCronScheduleMessage(jobKey));
+ }
- lockManager.validateIfLocked(
- ILockKey.build(LockKey.job(jobKey.newBuilder())),
- Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+ return storage.write(new MutateWork.Quiet<Response>() {
+ @Override
+ public Response apply(MutableStoreProvider storeProvider) {
+ try {
+ lockManager.validateIfLocked(
+ ILockKey.build(LockKey.job(jobKey.newBuilder())),
+ Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
- if (!sanitized.isCron()) {
- return invalidResponse(noCronScheduleMessage(jobKey));
- }
+ ITaskConfig template = sanitized.getJobConfig().getTaskConfig();
+ int count = sanitized.getJobConfig().getInstanceCount();
- ITaskConfig template = sanitized.getJobConfig().getTaskConfig();
- int count = sanitized.getJobConfig().getInstanceCount();
+ validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count));
- validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count));
+ // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob
+ if (updateOnly || cronJobManager.hasJob(sanitized.getJobConfig().getKey())) {
+ // The job already has a schedule: so update it.
+ cronJobManager.updateJob(SanitizedCronJob.from(sanitized));
+ } else {
+ checkJobExists(storeProvider, jobKey);
+ cronJobManager.createJob(SanitizedCronJob.from(sanitized));
+ }
- // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob
- if (updateOnly || cronJobManager.hasJob(sanitized.getJobConfig().getKey())) {
- // The job already has a schedule: so update it.
- cronJobManager.updateJob(SanitizedCronJob.from(sanitized));
- } else {
- cronJobManager.createJob(SanitizedCronJob.from(sanitized));
+ return okEmptyResponse();
+ } catch (LockException e) {
+ return errorResponse(LOCK_ERROR, e);
+ } catch (JobExistsException | TaskValidationException | CronException e) {
+ return errorResponse(INVALID_REQUEST, e);
+ }
}
-
- return okEmptyResponse();
- } catch (AuthFailedException e) {
- return errorResponse(AUTH_FAILED, e);
- } catch (LockException e) {
- return errorResponse(LOCK_ERROR, e);
- } catch (TaskDescriptionException | TaskValidationException | CronException e) {
- return errorResponse(INVALID_REQUEST, e);
- }
+ });
}
@Override
@@ -736,9 +753,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
public Response apply(MutableStoreProvider storeProvider) {
Query.Builder query = Query.arbitrary(mutableQuery);
- // Check single job scoping before adding statuses.
- boolean isSingleJobScoped = Query.isSingleJobScoped(query);
-
// Unless statuses were specifically supplied, only attempt to kill active tasks.
query = query.get().isSetStatuses() ? query : query.byStatus(ACTIVE_STATES);
@@ -767,19 +781,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
LOG.info("Killing tasks matching " + query);
- final boolean cronJobKilled;
- if (isSingleJobScoped) {
- // If this looks like a query for all tasks in a job, instruct the cron
- // scheduler to delete it.
- // TODO(mchucarroll): deprecate cron as a part of create/kill job. (AURORA-454)
- IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
- LOG.warning("Deprecated behavior: descheduling job " + jobKey
- + " with cron via killTasks. (See AURORA-454)");
- cronJobKilled = cronJobManager.deleteJob(jobKey);
- } else {
- cronJobKilled = false;
- }
-
boolean tasksKilled = false;
for (String taskId : Tasks.ids(tasks)) {
tasksKilled |= stateManager.changeState(
@@ -790,7 +791,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
killedByMessage(context.getIdentity()));
}
- return cronJobKilled || tasksKilled
+ return tasksKilled
? okEmptyResponse()
: addMessage(emptyResponse(), OK, NO_TASKS_TO_KILL_MESSAGE);
}
@@ -1373,7 +1374,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
ITaskConfig.build(mutableRequest.getTaskConfig())).newBuilder()));
if (cronJobManager.hasJob(job)) {
- return invalidResponse(NO_CRON_UPDATES);
+ return invalidResponse(NO_CRON);
}
} catch (AuthFailedException e) {
return errorResponse(AUTH_FAILED, e);
@@ -1562,13 +1563,18 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
}
@VisibleForTesting
+ static String jobAlreadyExistsMessage(IJobKey jobKey) {
+ return String.format("Job %s already exists", JobKeys.canonicalString(jobKey));
+ }
+
+ @VisibleForTesting
static final String NO_TASKS_TO_KILL_MESSAGE = "No tasks to kill.";
@VisibleForTesting
static final String NOOP_JOB_UPDATE_MESSAGE = "Job is unchanged by proposed update.";
@VisibleForTesting
- static final String NO_CRON_UPDATES = "Cron jobs may only be updated by calling scheduleCronJob.";
+ static final String NO_CRON = "Cron jobs may only be created/updated by calling scheduleCronJob.";
private static Response okEmptyResponse() {
return emptyResponse().setResponseCode(OK);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 8d41e70..ad9126c 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -169,7 +169,8 @@ import static org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryExcept
import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.MAX_TASKS_PER_JOB;
import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.MAX_TASK_ID_LENGTH;
import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NOOP_JOB_UPDATE_MESSAGE;
-import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NO_CRON_UPDATES;
+import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NO_CRON;
+import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.jobAlreadyExistsMessage;
import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.killedByMessage;
import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.noCronScheduleMessage;
import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.notScheduledCronMessage;
@@ -387,42 +388,14 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
}
@Test
- public void testCreateCronJob() throws Exception {
- IJobConfiguration job = IJobConfiguration.build(makeProdJob().setCronSchedule(CRON_SCHEDULE));
- SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
- expectAuth(ROLE, true);
- lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
- storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
- expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
- expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
- .andReturn(TASK_ID);
- expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
- .andReturn(ENOUGH_QUOTA);
-
- cronJobManager.createJob(anyObject(SanitizedCronJob.class));
-
- control.replay();
-
- assertOkResponse(thrift.createJob(job.newBuilder(), LOCK.newBuilder(), SESSION));
- }
-
- @Test
- public void testRejectCronJobEmptyCronSchedule() throws Exception {
+ public void testCreateJobFailsForCron() throws Exception {
IJobConfiguration job = IJobConfiguration.build(makeProdJob().setCronSchedule(""));
- SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
expectAuth(ROLE, true);
- lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
- storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
- expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
- expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
- .andReturn(TASK_ID);
- expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
- .andReturn(ENOUGH_QUOTA);
control.replay();
assertEquals(
- invalidResponse(SanitizedCronJob.NO_CRON_SCHEDULE),
+ invalidResponse(NO_CRON),
thrift.createJob(job.newBuilder(), LOCK.newBuilder(), SESSION));
}
@@ -799,7 +772,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
Query.Builder query = Query.unscoped().byJob(JOB_KEY);
expectAuth(ROOT, true);
storageUtil.expectTaskFetch(query.active(), buildScheduledTask());
- expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(false);
lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
expectTransitionsToKilling();
@@ -809,18 +781,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
}
@Test
- public void testKillCronJob() throws Exception {
- Query.Builder query = Query.jobScoped(JOB_KEY);
- expectAuth(ROOT, true);
- storageUtil.expectTaskFetch(query.active());
- expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(true);
-
- control.replay();
-
- assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
- }
-
- @Test
public void testKillTasksLockCheckFailed() throws Exception {
Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
IScheduledTask task2 = buildScheduledTask("job_bar", TASK_ID);
@@ -900,7 +860,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
storageUtil.expectTaskFetch(query.active());
expectAuth(ImmutableSet.of("role"), true);
- expect(cronJobManager.deleteJob(key)).andReturn(true);
control.replay();
assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
@@ -1273,13 +1232,34 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
.andReturn(ENOUGH_QUOTA);
- expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
+ expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false).times(2);
+ storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
cronJobManager.createJob(SanitizedCronJob.from(sanitized));
control.replay();
assertResponse(OK, thrift.scheduleCronJob(CRON_JOB, DEFAULT_LOCK, SESSION));
}
@Test
+ public void testScheduleCronFailsCreationDueToExistingNonCron() throws Exception {
+ expectAuth(ROLE, true);
+ lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
+ SanitizedConfiguration sanitized =
+ SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(CRON_JOB));
+
+ expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
+ .andReturn(TASK_ID);
+ expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
+ .andReturn(ENOUGH_QUOTA);
+
+ expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
+ storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), buildScheduledTask());
+ control.replay();
+ assertEquals(
+ invalidResponse(jobAlreadyExistsMessage(JOB_KEY)),
+ thrift.scheduleCronJob(CRON_JOB, DEFAULT_LOCK, SESSION));
+ }
+
+ @Test
public void testScheduleCronUpdatesJob() throws Exception {
expectAuth(ROLE, true);
lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
@@ -1309,6 +1289,16 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
}
@Test
+ public void testScheduleCronJobFailedTaskConfigValidation() throws Exception {
+ expectAuth(ROLE, true);
+ control.replay();
+ IJobConfiguration job = IJobConfiguration.build(makeJob(null));
+ assertResponse(
+ INVALID_REQUEST,
+ thrift.scheduleCronJob(job.newBuilder(), DEFAULT_LOCK, SESSION));
+ }
+
+ @Test
public void testScheduleCronJobFailsLockValidation() throws Exception {
expectAuth(ROLE, true);
lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
@@ -1320,7 +1310,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
@Test
public void testScheduleCronJobFailsWithNoCronSchedule() throws Exception {
expectAuth(ROLE, true);
- lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
control.replay();
assertEquals(
@@ -2696,7 +2685,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
expect(cronJobManager.hasJob(JOB_KEY)).andReturn(true);
control.replay();
- assertEquals(invalidResponse(NO_CRON_UPDATES), thrift.startJobUpdate(request, SESSION));
+ assertEquals(invalidResponse(NO_CRON), thrift.startJobUpdate(request, SESSION));
}
@Test