You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/01/08 02:05:34 UTC

incubator-aurora git commit: Removing cron schedule support from createJob and killTasks

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 7449e34d8 -> fb4d3f9ac


Removing cron schedule support from createJob and killTasks

Bugs closed: AURORA-454, AURORA-976

Reviewed at https://reviews.apache.org/r/29271/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/fb4d3f9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/fb4d3f9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/fb4d3f9a

Branch: refs/heads/master
Commit: fb4d3f9ac09eda54fd5259ef8b84669fa8cff864
Parents: 7449e34
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed Jan 7 17:01:36 2015 -0800
Committer: -l <ma...@apache.org>
Committed: Wed Jan 7 17:01:36 2015 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/base/Query.java |  17 ---
 .../thrift/SchedulerThriftInterface.java        | 138 ++++++++++---------
 .../thrift/SchedulerThriftInterfaceTest.java    |  85 +++++-------
 3 files changed, 109 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/main/java/org/apache/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java
index a6ff14a..458530f 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Query.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java
@@ -15,12 +15,9 @@ package org.apache.aurora.scheduler.base;
 
 import java.util.EnumSet;
 import java.util.Objects;
-import java.util.Set;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
 
 import org.apache.aurora.gen.ScheduleStatus;
@@ -56,20 +53,6 @@ public final class Query {
     return q.isSetRole() && q.isSetEnvironment() && q.isSetJobName() || q.isSetJobKeys();
   }
 
-  /**
-   * Checks whether a query is strictly scoped to a specific job. A query is strictly job scoped,
-   * iff the only fields that are set in the query are: role, environment and job name.
-   *
-   * @param query Query to test.
-   * @return {@code true} if the query is strictly single job scoped, otherwise {@code false}.
-   */
-  public static boolean isSingleJobScoped(Builder query) {
-    Optional<Set<IJobKey>> jobKey = JobKeys.from(query);
-    return jobKey.isPresent()
-        && jobKey.get().size() == 1
-        && Query.jobScoped(Iterables.getOnlyElement(jobKey.get())).equals(query);
-  }
-
   public static Builder arbitrary(TaskQuery query) {
     return new Builder(query.deepCopy());
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 6e3caa1..ac92959 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -286,6 +286,10 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       return errorResponse(INVALID_REQUEST, e);
     }
 
+    if (sanitized.isCron()) {
+      return invalidResponse(NO_CRON);
+    }
+
     return storage.write(new MutateWork.Quiet<Response>() {
       @Override
       public Response apply(MutableStoreProvider storeProvider) {
@@ -296,84 +300,97 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               ILockKey.build(LockKey.job(job.getKey().newBuilder())),
               Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
 
-          if (!storeProvider.getTaskStore().fetchTasks(
-              Query.jobScoped(job.getKey()).active()).isEmpty()
-              || cronJobManager.hasJob(job.getKey())) {
-
-            return invalidResponse("Job already exists: " + JobKeys.canonicalString(job.getKey()));
-          }
+          checkJobExists(storeProvider, job.getKey());
 
           ITaskConfig template = sanitized.getJobConfig().getTaskConfig();
           int count = sanitized.getJobConfig().getInstanceCount();
 
           validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count));
 
-          // TODO(mchucarroll): deprecate cron as a part of create/kill job.(AURORA-454)
-          if (sanitized.isCron()) {
-            LOG.warning("Deprecated behavior: scheduling job " + job.getKey()
-                + " with cron via createJob (AURORA_454)");
-            cronJobManager.createJob(SanitizedCronJob.from(sanitized));
-          } else {
-            LOG.info("Launching " + count + " tasks.");
-            stateManager.insertPendingTasks(
-                storeProvider,
-                template,
-                sanitized.getInstanceIds());
-          }
+          LOG.info("Launching " + count + " tasks.");
+          stateManager.insertPendingTasks(
+              storeProvider,
+              template,
+              sanitized.getInstanceIds());
+
           return okEmptyResponse();
         } catch (LockException e) {
           return errorResponse(LOCK_ERROR, e);
-        } catch (CronException | TaskValidationException e) {
+        } catch (JobExistsException | TaskValidationException e) {
           return errorResponse(INVALID_REQUEST, e);
         }
       }
     });
   }
 
+  private static class JobExistsException extends Exception {
+    public JobExistsException(String message) {
+      super(message);
+    }
+  }
+
+  private void checkJobExists(StoreProvider store, IJobKey jobKey) throws JobExistsException {
+    if (!store.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()).isEmpty()
+        || cronJobManager.hasJob(jobKey)) {
+
+      throw new JobExistsException(jobAlreadyExistsMessage(jobKey));
+    }
+  }
+
   private Response createOrUpdateCronTemplate(
       JobConfiguration mutableJob,
-      @Nullable Lock mutableLock,
+      @Nullable final Lock mutableLock,
       SessionKey session,
-      boolean updateOnly) {
+      final boolean updateOnly) {
 
     IJobConfiguration job = IJobConfiguration.build(mutableJob);
-    IJobKey jobKey = JobKeys.assertValid(job.getKey());
+    final IJobKey jobKey = JobKeys.assertValid(job.getKey());
     requireNonNull(session);
 
+    final SanitizedConfiguration sanitized;
     try {
       sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+      sanitized = SanitizedConfiguration.fromUnsanitized(job);
+    } catch (AuthFailedException e) {
+      return errorResponse(AUTH_FAILED, e);
+    } catch (TaskDescriptionException e) {
+      return errorResponse(INVALID_REQUEST, e);
+    }
 
-      SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
+    if (!sanitized.isCron()) {
+      return invalidResponse(noCronScheduleMessage(jobKey));
+    }
 
-      lockManager.validateIfLocked(
-          ILockKey.build(LockKey.job(jobKey.newBuilder())),
-          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        try {
+          lockManager.validateIfLocked(
+              ILockKey.build(LockKey.job(jobKey.newBuilder())),
+              Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
 
-      if (!sanitized.isCron()) {
-        return invalidResponse(noCronScheduleMessage(jobKey));
-      }
+          ITaskConfig template = sanitized.getJobConfig().getTaskConfig();
+          int count = sanitized.getJobConfig().getInstanceCount();
 
-      ITaskConfig template = sanitized.getJobConfig().getTaskConfig();
-      int count = sanitized.getJobConfig().getInstanceCount();
+          validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count));
 
-      validateTaskLimits(template, count, quotaManager.checkInstanceAddition(template, count));
+          // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob
+          if (updateOnly || cronJobManager.hasJob(sanitized.getJobConfig().getKey())) {
+            // The job already has a schedule: so update it.
+            cronJobManager.updateJob(SanitizedCronJob.from(sanitized));
+          } else {
+            checkJobExists(storeProvider, jobKey);
+            cronJobManager.createJob(SanitizedCronJob.from(sanitized));
+          }
 
-      // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob
-      if (updateOnly || cronJobManager.hasJob(sanitized.getJobConfig().getKey())) {
-        // The job already has a schedule: so update it.
-        cronJobManager.updateJob(SanitizedCronJob.from(sanitized));
-      } else {
-        cronJobManager.createJob(SanitizedCronJob.from(sanitized));
+          return okEmptyResponse();
+        } catch (LockException e) {
+          return errorResponse(LOCK_ERROR, e);
+        } catch (JobExistsException | TaskValidationException | CronException e) {
+          return errorResponse(INVALID_REQUEST, e);
+        }
       }
-
-      return okEmptyResponse();
-    } catch (AuthFailedException e) {
-      return errorResponse(AUTH_FAILED, e);
-    } catch (LockException e) {
-      return errorResponse(LOCK_ERROR, e);
-    } catch (TaskDescriptionException | TaskValidationException | CronException e) {
-      return errorResponse(INVALID_REQUEST, e);
-    }
+    });
   }
 
   @Override
@@ -736,9 +753,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       public Response apply(MutableStoreProvider storeProvider) {
         Query.Builder query = Query.arbitrary(mutableQuery);
 
-        // Check single job scoping before adding statuses.
-        boolean isSingleJobScoped = Query.isSingleJobScoped(query);
-
         // Unless statuses were specifically supplied, only attempt to kill active tasks.
         query = query.get().isSetStatuses() ? query : query.byStatus(ACTIVE_STATES);
 
@@ -767,19 +781,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
         LOG.info("Killing tasks matching " + query);
 
-        final boolean cronJobKilled;
-        if (isSingleJobScoped) {
-          // If this looks like a query for all tasks in a job, instruct the cron
-          // scheduler to delete it.
-          // TODO(mchucarroll): deprecate cron as a part of create/kill job.  (AURORA-454)
-          IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
-          LOG.warning("Deprecated behavior: descheduling job " + jobKey
-              + " with cron via killTasks. (See AURORA-454)");
-          cronJobKilled = cronJobManager.deleteJob(jobKey);
-        } else {
-          cronJobKilled = false;
-        }
-
         boolean tasksKilled = false;
         for (String taskId : Tasks.ids(tasks)) {
           tasksKilled |= stateManager.changeState(
@@ -790,7 +791,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               killedByMessage(context.getIdentity()));
         }
 
-        return cronJobKilled || tasksKilled
+        return tasksKilled
             ? okEmptyResponse()
             : addMessage(emptyResponse(), OK, NO_TASKS_TO_KILL_MESSAGE);
       }
@@ -1373,7 +1374,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               ITaskConfig.build(mutableRequest.getTaskConfig())).newBuilder()));
 
       if (cronJobManager.hasJob(job)) {
-        return invalidResponse(NO_CRON_UPDATES);
+        return invalidResponse(NO_CRON);
       }
     } catch (AuthFailedException e) {
       return errorResponse(AUTH_FAILED, e);
@@ -1562,13 +1563,18 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   @VisibleForTesting
+  static String jobAlreadyExistsMessage(IJobKey jobKey) {
+    return String.format("Job %s already exists", JobKeys.canonicalString(jobKey));
+  }
+
+  @VisibleForTesting
   static final String NO_TASKS_TO_KILL_MESSAGE = "No tasks to kill.";
 
   @VisibleForTesting
   static final String NOOP_JOB_UPDATE_MESSAGE = "Job is unchanged by proposed update.";
 
   @VisibleForTesting
-  static final String NO_CRON_UPDATES = "Cron jobs may only be updated by calling scheduleCronJob.";
+  static final String NO_CRON = "Cron jobs may only be created/updated by calling scheduleCronJob.";
 
   private static Response okEmptyResponse()  {
     return emptyResponse().setResponseCode(OK);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fb4d3f9a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 8d41e70..ad9126c 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -169,7 +169,8 @@ import static org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryExcept
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.MAX_TASKS_PER_JOB;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.MAX_TASK_ID_LENGTH;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NOOP_JOB_UPDATE_MESSAGE;
-import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NO_CRON_UPDATES;
+import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.NO_CRON;
+import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.jobAlreadyExistsMessage;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.killedByMessage;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.noCronScheduleMessage;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.notScheduledCronMessage;
@@ -387,42 +388,14 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testCreateCronJob() throws Exception {
-    IJobConfiguration job = IJobConfiguration.build(makeProdJob().setCronSchedule(CRON_SCHEDULE));
-    SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
-    expectAuth(ROLE, true);
-    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
-    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
-    expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
-    expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
-        .andReturn(TASK_ID);
-    expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
-        .andReturn(ENOUGH_QUOTA);
-
-    cronJobManager.createJob(anyObject(SanitizedCronJob.class));
-
-    control.replay();
-
-    assertOkResponse(thrift.createJob(job.newBuilder(), LOCK.newBuilder(), SESSION));
-  }
-
-  @Test
-  public void testRejectCronJobEmptyCronSchedule() throws Exception {
+  public void testCreateJobFailsForCron() throws Exception {
     IJobConfiguration job = IJobConfiguration.build(makeProdJob().setCronSchedule(""));
-    SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job);
     expectAuth(ROLE, true);
-    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
-    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
-    expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
-    expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
-        .andReturn(TASK_ID);
-    expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
-        .andReturn(ENOUGH_QUOTA);
 
     control.replay();
 
     assertEquals(
-        invalidResponse(SanitizedCronJob.NO_CRON_SCHEDULE),
+        invalidResponse(NO_CRON),
         thrift.createJob(job.newBuilder(), LOCK.newBuilder(), SESSION));
   }
 
@@ -799,7 +772,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY);
     expectAuth(ROOT, true);
     storageUtil.expectTaskFetch(query.active(), buildScheduledTask());
-    expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(false);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
     expectTransitionsToKilling();
 
@@ -809,18 +781,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testKillCronJob() throws Exception {
-    Query.Builder query = Query.jobScoped(JOB_KEY);
-    expectAuth(ROOT, true);
-    storageUtil.expectTaskFetch(query.active());
-    expect(cronJobManager.deleteJob(JOB_KEY)).andReturn(true);
-
-    control.replay();
-
-    assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
-  }
-
-  @Test
   public void testKillTasksLockCheckFailed() throws Exception {
     Query.Builder query = Query.unscoped().byJob(JOB_KEY).active();
     IScheduledTask task2 = buildScheduledTask("job_bar", TASK_ID);
@@ -900,7 +860,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
     storageUtil.expectTaskFetch(query.active());
     expectAuth(ImmutableSet.of("role"), true);
-    expect(cronJobManager.deleteJob(key)).andReturn(true);
 
     control.replay();
     assertOkResponse(thrift.killTasks(query.get(), DEFAULT_LOCK, SESSION));
@@ -1273,13 +1232,34 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
         .andReturn(ENOUGH_QUOTA);
 
-    expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
+    expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false).times(2);
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     cronJobManager.createJob(SanitizedCronJob.from(sanitized));
     control.replay();
     assertResponse(OK, thrift.scheduleCronJob(CRON_JOB, DEFAULT_LOCK, SESSION));
   }
 
   @Test
+  public void testScheduleCronFailsCreationDueToExistingNonCron() throws Exception {
+    expectAuth(ROLE, true);
+    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
+    SanitizedConfiguration sanitized =
+        SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(CRON_JOB));
+
+    expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
+        .andReturn(TASK_ID);
+    expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
+        .andReturn(ENOUGH_QUOTA);
+
+    expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
+    storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active(), buildScheduledTask());
+    control.replay();
+    assertEquals(
+        invalidResponse(jobAlreadyExistsMessage(JOB_KEY)),
+        thrift.scheduleCronJob(CRON_JOB, DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
   public void testScheduleCronUpdatesJob() throws Exception {
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
@@ -1309,6 +1289,16 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
+  public void testScheduleCronJobFailedTaskConfigValidation() throws Exception {
+    expectAuth(ROLE, true);
+    control.replay();
+    IJobConfiguration job = IJobConfiguration.build(makeJob(null));
+    assertResponse(
+        INVALID_REQUEST,
+        thrift.scheduleCronJob(job.newBuilder(), DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
   public void testScheduleCronJobFailsLockValidation() throws Exception {
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
@@ -1320,7 +1310,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   @Test
   public void testScheduleCronJobFailsWithNoCronSchedule() throws Exception {
     expectAuth(ROLE, true);
-    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
     control.replay();
 
     assertEquals(
@@ -2696,7 +2685,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(true);
 
     control.replay();
-    assertEquals(invalidResponse(NO_CRON_UPDATES), thrift.startJobUpdate(request, SESSION));
+    assertEquals(invalidResponse(NO_CRON), thrift.startJobUpdate(request, SESSION));
   }
 
   @Test