You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/22 20:05:30 UTC

git commit: Improve test coverage for CronJobManager.

Updated Branches:
  refs/heads/master b0f268b36 -> ef4e60c1d


Improve test coverage for CronJobManager.

Bugs closed: AURORA-62

Reviewed at https://reviews.apache.org/r/17131/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ef4e60c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ef4e60c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ef4e60c1

Branch: refs/heads/master
Commit: ef4e60c1d0492b8eec8ebe18f3cb830e6dcf7b87
Parents: b0f268b
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 22 11:04:03 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 22 11:04:03 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/state/CronJobManager.java  |  67 +++++---
 .../scheduler/state/CronJobManagerTest.java     | 171 +++++++++++++++++--
 2 files changed, 203 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ef4e60c1/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
index 5a56a70..371addf 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
@@ -147,13 +147,13 @@ public class CronJobManager extends JobManager implements EventSubscriber {
     Stats.exportSize("cron_num_pending_runs", pendingRuns);
   }
 
-  private void mapScheduledJob(IJobConfiguration job, String scheduledJobKey) {
-    IJobKey jobKey = job.getKey();
+  private void mapScheduledJob(SanitizedCronJob cronJob) throws ScheduleException {
+    IJobKey jobKey = cronJob.config.getJobConfig().getKey();
     synchronized (scheduledJobs) {
       Preconditions.checkState(
           !scheduledJobs.containsKey(jobKey),
           "Illegal state - cron schedule already exists for " + JobKeys.toPath(jobKey));
-      scheduledJobs.put(jobKey, scheduledJobKey);
+      scheduledJobs.put(jobKey, scheduleJob(cronJob));
     }
   }
 
@@ -181,7 +181,7 @@ public class CronJobManager extends JobManager implements EventSubscriber {
 
     for (IJobConfiguration job : crons) {
       try {
-        mapScheduledJob(job, scheduleJob(SanitizedConfiguration.fromUnsanitized(job)));
+        mapScheduledJob(new SanitizedCronJob(job, cron));
       } catch (ScheduleException | TaskDescriptionException e) {
         logLaunchFailure(job, e);
       }
@@ -197,14 +197,17 @@ public class CronJobManager extends JobManager implements EventSubscriber {
    * Triggers execution of a job.
    *
    * @param jobKey Key of the job to start.
+   * @throws ScheduleException If the job could not be started with the cron system.
+   * @throws TaskDescriptionException If the stored job associated with {@code jobKey} has field
+   *         validation problems.
    */
-  public void startJobNow(IJobKey jobKey) throws TaskDescriptionException {
+  public void startJobNow(IJobKey jobKey) throws TaskDescriptionException, ScheduleException {
     checkNotNull(jobKey);
 
     Optional<IJobConfiguration> jobConfig = fetchJob(jobKey);
     checkArgument(jobConfig.isPresent(), "No such cron job " + JobKeys.toPath(jobKey));
 
-    cronTriggered(SanitizedConfiguration.fromUnsanitized(jobConfig.get()));
+    cronTriggered(new SanitizedCronJob(jobConfig.get(), cron));
   }
 
   private void delayedRun(final Query.Builder query, final SanitizedConfiguration config) {
@@ -258,10 +261,10 @@ public class CronJobManager extends JobManager implements EventSubscriber {
   /**
    * Triggers execution of a cron job, depending on the cron collision policy for the job.
    *
-   * @param config The config of the job to be triggered.
+   * @param cronJob The job to be triggered.
    */
-  @VisibleForTesting
-  void cronTriggered(SanitizedConfiguration config) {
+  private void cronTriggered(SanitizedCronJob cronJob) {
+    SanitizedConfiguration config = cronJob.config;
     IJobConfiguration job = config.getJobConfig();
     LOG.info(String.format("Cron triggered for %s at %s with policy %s",
         JobKeys.toPath(job), new Date(), job.getCronCollisionPolicy()));
@@ -347,36 +350,27 @@ public class CronJobManager extends JobManager implements EventSubscriber {
       return false;
     }
 
-    String scheduledJobKey = scheduleJob(config);
+    SanitizedCronJob cronJob = new SanitizedCronJob(config, cron);
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
         storeProvider.getJobStore().saveAcceptedJob(MANAGER_KEY, job);
       }
     });
-    mapScheduledJob(job, scheduledJobKey);
+    mapScheduledJob(cronJob);
 
     return true;
   }
 
-  private String scheduleJob(final SanitizedConfiguration config) throws ScheduleException {
-    final IJobConfiguration job = config.getJobConfig();
+  private String scheduleJob(final SanitizedCronJob cronJob) throws ScheduleException {
+    IJobConfiguration job = cronJob.config.getJobConfig();
     final String jobPath = JobKeys.toPath(job);
-    if (!hasCronSchedule(job)) {
-      throw new ScheduleException(
-          String.format("Not a valid cronjob, %s has no cron schedule", jobPath));
-    }
-
-    if (!cron.isValidSchedule(job.getCronSchedule())) {
-      throw new ScheduleException("Invalid cron schedule: " + job.getCronSchedule());
-    }
-
     LOG.info(String.format("Scheduling cron job %s: %s", jobPath, job.getCronSchedule()));
     try {
       return cron.schedule(job.getCronSchedule(), new Runnable() {
         @Override public void run() {
           // TODO(William Farner): May want to record information about job runs.
           LOG.info("Running cron job: " + jobPath);
-          cronTriggered(config);
+          cronTriggered(cronJob);
         }
       });
     } catch (CronException e) {
@@ -446,4 +440,31 @@ public class CronJobManager extends JobManager implements EventSubscriber {
       return ImmutableSet.copyOf(pendingRuns.keySet());
     }
   }
+
+  /**
+   * Used by functions that expect field validation before being called.
+   */
+  private static class SanitizedCronJob {
+    private final SanitizedConfiguration config;
+
+    SanitizedCronJob(IJobConfiguration unsanitized, CronScheduler cron)
+        throws ScheduleException, TaskDescriptionException {
+
+      this(SanitizedConfiguration.fromUnsanitized(unsanitized), cron);
+    }
+
+    SanitizedCronJob(SanitizedConfiguration config, CronScheduler cron) throws ScheduleException {
+      final IJobConfiguration job = config.getJobConfig();
+      if (!hasCronSchedule(job)) {
+        throw new ScheduleException(
+            String.format("Not a valid cronjob, %s has no cron schedule", JobKeys.toPath(job)));
+      }
+
+      if (!cron.isValidSchedule(job.getCronSchedule())) {
+        throw new ScheduleException("Invalid cron schedule: " + job.getCronSchedule());
+      }
+
+      this.config = config;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ef4e60c1/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
index e9886cd..684e239 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
@@ -19,6 +19,8 @@ import java.util.concurrent.Executor;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -26,6 +28,7 @@ import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.base.ExceptionalCommand;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.CronCollisionPolicy;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobConfiguration;
@@ -35,11 +38,13 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.CronScheduler;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.Capture;
@@ -49,11 +54,15 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
+import static org.apache.aurora.scheduler.state.CronJobManager.CRON_USER;
+import static org.apache.aurora.scheduler.state.CronJobManager.MANAGER_KEY;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class CronJobManagerTest extends EasyMockTest {
@@ -61,10 +70,11 @@ public class CronJobManagerTest extends EasyMockTest {
   private static final String OWNER = "owner";
   private static final String ENVIRONMENT = "staging11";
   private static final String JOB_NAME = "jobName";
+  private static final String DEFAULT_JOB_KEY = "key";
   private static final IScheduledTask TASK = IScheduledTask.build(new ScheduledTask());
 
   private SchedulerCore scheduler;
-  private StateManagerImpl stateManager;
+  private StateManager stateManager;
   private Executor delayExecutor;
   private Capture<Runnable> delayLaunchCapture;
   private StorageTestUtil storageUtil;
@@ -78,7 +88,7 @@ public class CronJobManagerTest extends EasyMockTest {
   @Before
   public void setUp() throws Exception {
     scheduler = createMock(SchedulerCore.class);
-    stateManager = createMock(StateManagerImpl.class);
+    stateManager = createMock(StateManager.class);
     delayExecutor = createMock(Executor.class);
     delayLaunchCapture = createCapture();
     storageUtil = new StorageTestUtil(this);
@@ -97,19 +107,30 @@ public class CronJobManagerTest extends EasyMockTest {
     sanitizedConfiguration = SanitizedConfiguration.fromUnsanitized(job);
   }
 
-  private void expectJobAccepted(IJobConfiguration savedJob) throws Exception {
-    storageUtil.jobStore.saveAcceptedJob(CronJobManager.MANAGER_KEY, savedJob);
+  private void expectJobValidated(IJobConfiguration savedJob) {
     expect(cronScheduler.isValidSchedule(savedJob.getCronSchedule())).andReturn(true);
-    expect(cronScheduler.schedule(eq(savedJob.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andReturn("key");
+  }
+
+  private void expectJobValidated() {
+    expectJobValidated(sanitizedConfiguration.getJobConfig());
+  }
+
+  private Capture<Runnable> expectJobAccepted(IJobConfiguration savedJob) throws Exception {
+    expectJobValidated(savedJob);
+    storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, savedJob);
+    Capture<Runnable> jobTriggerCapture = createCapture();
+    expect(cronScheduler.schedule(eq(savedJob.getCronSchedule()), capture(jobTriggerCapture)))
+        .andReturn(DEFAULT_JOB_KEY);
+    return jobTriggerCapture;
   }
 
   private void expectJobAccepted() throws Exception {
     expectJobAccepted(sanitizedConfiguration.getJobConfig());
   }
 
-  private IExpectationSetters<?> expectJobFetch() {
-    return expect(storageUtil.jobStore.fetchJob(CronJobManager.MANAGER_KEY, job.getKey()))
+  private void expectJobFetch() {
+    expectJobValidated();
+    expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
         .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
   }
 
@@ -121,7 +142,7 @@ public class CronJobManagerTest extends EasyMockTest {
   public void testPubsubWiring() throws Exception {
     cronScheduler.start();
     shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
-    expect(storageUtil.jobStore.fetchJobs(CronJobManager.MANAGER_KEY))
+    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
         .andReturn(ImmutableList.<IJobConfiguration>of());
 
     control.replay();
@@ -150,11 +171,29 @@ public class CronJobManagerTest extends EasyMockTest {
 
     // Job is executed immediately since there are no existing tasks to kill.
     stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
+    expect(cronScheduler.getSchedule(DEFAULT_JOB_KEY))
+        .andReturn(Optional.of(job.getCronSchedule()))
+        .times(2);
 
     control.replay();
 
+    assertEquals(ImmutableMap.<IJobKey, String>of(), cron.getScheduledJobs());
     cron.receiveJob(sanitizedConfiguration);
+    assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
     cron.startJobNow(job.getKey());
+    assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
+  }
+
+  @Test
+  public void testDeleteInconsistent() throws Exception {
+    // Tests a case where a job exists in the storage, but is not registered with the cron system.
+
+    expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
+        .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
+
+    control.replay();
+
+    assertTrue(cron.deleteJob(job.getKey()));
   }
 
   @Test
@@ -183,7 +222,9 @@ public class CronJobManagerTest extends EasyMockTest {
 
     cron.receiveJob(sanitizedConfiguration);
     cron.startJobNow(job.getKey());
+    assertEquals(ImmutableSet.of(job.getKey()), cron.getPendingRuns());
     delayLaunchCapture.getValue().run();
+    assertEquals(ImmutableSet.<IJobKey>of(), cron.getPendingRuns());
   }
 
   @Test
@@ -241,7 +282,8 @@ public class CronJobManagerTest extends EasyMockTest {
     delayExecutor.execute(capture(delayLaunchCapture));
 
     // The cron manager will then try to initiate the kill.
-    expectJobFetch().times(2);
+    expectJobFetch();
+    expectJobFetch();
     scheduler.killTasks((Query.Builder) anyObject(), eq(CronJobManager.CRON_USER));
     expectLastCall().times(3);
 
@@ -271,7 +313,7 @@ public class CronJobManagerTest extends EasyMockTest {
         IJobConfiguration.build(job.newBuilder().setCronSchedule("1 2 3 4 5")));
 
     expectJobAccepted();
-    cronScheduler.deschedule("key");
+    cronScheduler.deschedule(DEFAULT_JOB_KEY);
     expectJobAccepted(updated.getJobConfig());
 
     control.replay();
@@ -286,7 +328,7 @@ public class CronJobManagerTest extends EasyMockTest {
         IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
 
     expectJobAccepted();
-    cronScheduler.deschedule("key");
+    cronScheduler.deschedule(DEFAULT_JOB_KEY);
     expectJobAccepted(updated);
 
     control.replay();
@@ -305,6 +347,111 @@ public class CronJobManagerTest extends EasyMockTest {
     cron.updateJob(new SanitizedConfiguration(updated));
   }
 
+  @Test
+  public void testInvalidStoredJob() throws Exception {
+    // Invalid jobs are left alone, but doesn't halt operation.
+
+    cronScheduler.start();
+    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
+
+    IJobConfiguration jobA =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5 6 7"));
+    IJobConfiguration jobB =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule(null));
+
+    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY)).andReturn(ImmutableList.of(jobA, jobB));
+    expect(cronScheduler.isValidSchedule(jobA.getCronSchedule())).andReturn(false);
+
+    control.replay();
+
+    cron.schedulerActive(new SchedulerActive());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testJobStoredTwice() throws Exception {
+    // Simulate an inconsistent storage that contains two cron jobs under the same key.
+
+    cronScheduler.start();
+    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
+
+    IJobConfiguration jobA =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
+    IJobConfiguration jobB =
+        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("* * * * *"));
+    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
+        .andReturn(ImmutableList.of(jobA, jobB));
+    expectJobValidated(jobA);
+    expect(cronScheduler.schedule(eq(jobA.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andReturn("keyA");
+    expectJobValidated(jobB);
+
+    control.replay();
+
+    cron.schedulerActive(new SchedulerActive());
+  }
+
+  @Test(expected = ScheduleException.class)
+  public void testInvalidCronSchedule() throws Exception {
+    expect(cronScheduler.isValidSchedule(job.getCronSchedule())).andReturn(false);
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+  }
+
+  @Test
+  public void testKillExistingCollisionFailedKill() throws Exception {
+    IJobConfiguration killExisting = IJobConfiguration.build(
+        job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING));
+    Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
+    expectActiveTaskFetch(TASK);
+    scheduler.killTasks(Query.jobScoped(killExisting.getKey()).active(), CRON_USER);
+    expectLastCall().andThrow(new ScheduleException("injected"));
+
+    control.replay();
+
+    cron.receiveJob(new SanitizedConfiguration(killExisting));
+    jobTriggerCapture.getValue().run();
+  }
+
+  @Test
+  public void testCancelNewCollision() throws Exception {
+    IJobConfiguration killExisting = IJobConfiguration.build(
+        job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.CANCEL_NEW));
+    Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
+    expectActiveTaskFetch(TASK);
+
+    control.replay();
+
+    cron.receiveJob(new SanitizedConfiguration(killExisting));
+    jobTriggerCapture.getValue().run();
+  }
+
+  @Test
+  public void testRunOverlapCollision() throws Exception {
+    IJobConfiguration killExisting = IJobConfiguration.build(
+        job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP));
+    Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
+    expectActiveTaskFetch(TASK);
+
+    control.replay();
+
+    cron.receiveJob(new SanitizedConfiguration(killExisting));
+    jobTriggerCapture.getValue().run();
+  }
+
+  @Test(expected = ScheduleException.class)
+  public void testScheduleFails() throws Exception {
+    expectJobValidated(job);
+    storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, sanitizedConfiguration.getJobConfig());
+    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+        .andThrow(new CronException("injected"));
+
+    control.replay();
+
+    cron.receiveJob(sanitizedConfiguration);
+  }
+
   private IJobConfiguration makeJob() {
     return IJobConfiguration.build(new JobConfiguration()
         .setOwner(new Identity(OWNER, OWNER))