You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/22 20:05:30 UTC
git commit: Improve test coverage for CronJobManager.
Updated Branches:
refs/heads/master b0f268b36 -> ef4e60c1d
Improve test coverage for CronJobManager.
Bugs closed: AURORA-62
Reviewed at https://reviews.apache.org/r/17131/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ef4e60c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ef4e60c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ef4e60c1
Branch: refs/heads/master
Commit: ef4e60c1d0492b8eec8ebe18f3cb830e6dcf7b87
Parents: b0f268b
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 22 11:04:03 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 22 11:04:03 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/state/CronJobManager.java | 67 +++++---
.../scheduler/state/CronJobManagerTest.java | 171 +++++++++++++++++--
2 files changed, 203 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ef4e60c1/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
index 5a56a70..371addf 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
@@ -147,13 +147,13 @@ public class CronJobManager extends JobManager implements EventSubscriber {
Stats.exportSize("cron_num_pending_runs", pendingRuns);
}
- private void mapScheduledJob(IJobConfiguration job, String scheduledJobKey) {
- IJobKey jobKey = job.getKey();
+ private void mapScheduledJob(SanitizedCronJob cronJob) throws ScheduleException {
+ IJobKey jobKey = cronJob.config.getJobConfig().getKey();
synchronized (scheduledJobs) {
Preconditions.checkState(
!scheduledJobs.containsKey(jobKey),
"Illegal state - cron schedule already exists for " + JobKeys.toPath(jobKey));
- scheduledJobs.put(jobKey, scheduledJobKey);
+ scheduledJobs.put(jobKey, scheduleJob(cronJob));
}
}
@@ -181,7 +181,7 @@ public class CronJobManager extends JobManager implements EventSubscriber {
for (IJobConfiguration job : crons) {
try {
- mapScheduledJob(job, scheduleJob(SanitizedConfiguration.fromUnsanitized(job)));
+ mapScheduledJob(new SanitizedCronJob(job, cron));
} catch (ScheduleException | TaskDescriptionException e) {
logLaunchFailure(job, e);
}
@@ -197,14 +197,17 @@ public class CronJobManager extends JobManager implements EventSubscriber {
* Triggers execution of a job.
*
* @param jobKey Key of the job to start.
+ * @throws ScheduleException If the job could not be started with the cron system.
+ * @throws TaskDescriptionException If the stored job associated with {@code jobKey} has field
+ * validation problems.
*/
- public void startJobNow(IJobKey jobKey) throws TaskDescriptionException {
+ public void startJobNow(IJobKey jobKey) throws TaskDescriptionException, ScheduleException {
checkNotNull(jobKey);
Optional<IJobConfiguration> jobConfig = fetchJob(jobKey);
checkArgument(jobConfig.isPresent(), "No such cron job " + JobKeys.toPath(jobKey));
- cronTriggered(SanitizedConfiguration.fromUnsanitized(jobConfig.get()));
+ cronTriggered(new SanitizedCronJob(jobConfig.get(), cron));
}
private void delayedRun(final Query.Builder query, final SanitizedConfiguration config) {
@@ -258,10 +261,10 @@ public class CronJobManager extends JobManager implements EventSubscriber {
/**
* Triggers execution of a cron job, depending on the cron collision policy for the job.
*
- * @param config The config of the job to be triggered.
+ * @param cronJob The job to be triggered.
*/
- @VisibleForTesting
- void cronTriggered(SanitizedConfiguration config) {
+ private void cronTriggered(SanitizedCronJob cronJob) {
+ SanitizedConfiguration config = cronJob.config;
IJobConfiguration job = config.getJobConfig();
LOG.info(String.format("Cron triggered for %s at %s with policy %s",
JobKeys.toPath(job), new Date(), job.getCronCollisionPolicy()));
@@ -347,36 +350,27 @@ public class CronJobManager extends JobManager implements EventSubscriber {
return false;
}
- String scheduledJobKey = scheduleJob(config);
+ SanitizedCronJob cronJob = new SanitizedCronJob(config, cron);
storage.write(new MutateWork.NoResult.Quiet() {
@Override protected void execute(Storage.MutableStoreProvider storeProvider) {
storeProvider.getJobStore().saveAcceptedJob(MANAGER_KEY, job);
}
});
- mapScheduledJob(job, scheduledJobKey);
+ mapScheduledJob(cronJob);
return true;
}
- private String scheduleJob(final SanitizedConfiguration config) throws ScheduleException {
- final IJobConfiguration job = config.getJobConfig();
+ private String scheduleJob(final SanitizedCronJob cronJob) throws ScheduleException {
+ IJobConfiguration job = cronJob.config.getJobConfig();
final String jobPath = JobKeys.toPath(job);
- if (!hasCronSchedule(job)) {
- throw new ScheduleException(
- String.format("Not a valid cronjob, %s has no cron schedule", jobPath));
- }
-
- if (!cron.isValidSchedule(job.getCronSchedule())) {
- throw new ScheduleException("Invalid cron schedule: " + job.getCronSchedule());
- }
-
LOG.info(String.format("Scheduling cron job %s: %s", jobPath, job.getCronSchedule()));
try {
return cron.schedule(job.getCronSchedule(), new Runnable() {
@Override public void run() {
// TODO(William Farner): May want to record information about job runs.
LOG.info("Running cron job: " + jobPath);
- cronTriggered(config);
+ cronTriggered(cronJob);
}
});
} catch (CronException e) {
@@ -446,4 +440,31 @@ public class CronJobManager extends JobManager implements EventSubscriber {
return ImmutableSet.copyOf(pendingRuns.keySet());
}
}
+
+ /**
+ * Used by functions that expect field validation before being called.
+ */
+ private static class SanitizedCronJob {
+ private final SanitizedConfiguration config;
+
+ SanitizedCronJob(IJobConfiguration unsanitized, CronScheduler cron)
+ throws ScheduleException, TaskDescriptionException {
+
+ this(SanitizedConfiguration.fromUnsanitized(unsanitized), cron);
+ }
+
+ SanitizedCronJob(SanitizedConfiguration config, CronScheduler cron) throws ScheduleException {
+ final IJobConfiguration job = config.getJobConfig();
+ if (!hasCronSchedule(job)) {
+ throw new ScheduleException(
+ String.format("Not a valid cronjob, %s has no cron schedule", JobKeys.toPath(job)));
+ }
+
+ if (!cron.isValidSchedule(job.getCronSchedule())) {
+ throw new ScheduleException("Invalid cron schedule: " + job.getCronSchedule());
+ }
+
+ this.config = config;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ef4e60c1/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
index e9886cd..684e239 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
@@ -19,6 +19,8 @@ import java.util.concurrent.Executor;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -26,6 +28,7 @@ import com.twitter.common.application.ShutdownRegistry;
import com.twitter.common.base.ExceptionalCommand;
import com.twitter.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.CronCollisionPolicy;
import org.apache.aurora.gen.ExecutorConfig;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.JobConfiguration;
@@ -35,11 +38,13 @@ import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.ScheduleException;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
import org.apache.aurora.scheduler.cron.CronScheduler;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.Capture;
@@ -49,11 +54,15 @@ import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
+import static org.apache.aurora.scheduler.state.CronJobManager.CRON_USER;
+import static org.apache.aurora.scheduler.state.CronJobManager.MANAGER_KEY;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class CronJobManagerTest extends EasyMockTest {
@@ -61,10 +70,11 @@ public class CronJobManagerTest extends EasyMockTest {
private static final String OWNER = "owner";
private static final String ENVIRONMENT = "staging11";
private static final String JOB_NAME = "jobName";
+ private static final String DEFAULT_JOB_KEY = "key";
private static final IScheduledTask TASK = IScheduledTask.build(new ScheduledTask());
private SchedulerCore scheduler;
- private StateManagerImpl stateManager;
+ private StateManager stateManager;
private Executor delayExecutor;
private Capture<Runnable> delayLaunchCapture;
private StorageTestUtil storageUtil;
@@ -78,7 +88,7 @@ public class CronJobManagerTest extends EasyMockTest {
@Before
public void setUp() throws Exception {
scheduler = createMock(SchedulerCore.class);
- stateManager = createMock(StateManagerImpl.class);
+ stateManager = createMock(StateManager.class);
delayExecutor = createMock(Executor.class);
delayLaunchCapture = createCapture();
storageUtil = new StorageTestUtil(this);
@@ -97,19 +107,30 @@ public class CronJobManagerTest extends EasyMockTest {
sanitizedConfiguration = SanitizedConfiguration.fromUnsanitized(job);
}
- private void expectJobAccepted(IJobConfiguration savedJob) throws Exception {
- storageUtil.jobStore.saveAcceptedJob(CronJobManager.MANAGER_KEY, savedJob);
+ private void expectJobValidated(IJobConfiguration savedJob) {
expect(cronScheduler.isValidSchedule(savedJob.getCronSchedule())).andReturn(true);
- expect(cronScheduler.schedule(eq(savedJob.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andReturn("key");
+ }
+
+ private void expectJobValidated() {
+ expectJobValidated(sanitizedConfiguration.getJobConfig());
+ }
+
+ private Capture<Runnable> expectJobAccepted(IJobConfiguration savedJob) throws Exception {
+ expectJobValidated(savedJob);
+ storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, savedJob);
+ Capture<Runnable> jobTriggerCapture = createCapture();
+ expect(cronScheduler.schedule(eq(savedJob.getCronSchedule()), capture(jobTriggerCapture)))
+ .andReturn(DEFAULT_JOB_KEY);
+ return jobTriggerCapture;
}
private void expectJobAccepted() throws Exception {
expectJobAccepted(sanitizedConfiguration.getJobConfig());
}
- private IExpectationSetters<?> expectJobFetch() {
- return expect(storageUtil.jobStore.fetchJob(CronJobManager.MANAGER_KEY, job.getKey()))
+ private void expectJobFetch() {
+ expectJobValidated();
+ expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
.andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
}
@@ -121,7 +142,7 @@ public class CronJobManagerTest extends EasyMockTest {
public void testPubsubWiring() throws Exception {
cronScheduler.start();
shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
- expect(storageUtil.jobStore.fetchJobs(CronJobManager.MANAGER_KEY))
+ expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
.andReturn(ImmutableList.<IJobConfiguration>of());
control.replay();
@@ -150,11 +171,29 @@ public class CronJobManagerTest extends EasyMockTest {
// Job is executed immediately since there are no existing tasks to kill.
stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
+ expect(cronScheduler.getSchedule(DEFAULT_JOB_KEY))
+ .andReturn(Optional.of(job.getCronSchedule()))
+ .times(2);
control.replay();
+ assertEquals(ImmutableMap.<IJobKey, String>of(), cron.getScheduledJobs());
cron.receiveJob(sanitizedConfiguration);
+ assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
cron.startJobNow(job.getKey());
+ assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
+ }
+
+ @Test
+ public void testDeleteInconsistent() throws Exception {
+ // Tests a case where a job exists in the storage, but is not registered with the cron system.
+
+ expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
+ .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
+
+ control.replay();
+
+ assertTrue(cron.deleteJob(job.getKey()));
}
@Test
@@ -183,7 +222,9 @@ public class CronJobManagerTest extends EasyMockTest {
cron.receiveJob(sanitizedConfiguration);
cron.startJobNow(job.getKey());
+ assertEquals(ImmutableSet.of(job.getKey()), cron.getPendingRuns());
delayLaunchCapture.getValue().run();
+ assertEquals(ImmutableSet.<IJobKey>of(), cron.getPendingRuns());
}
@Test
@@ -241,7 +282,8 @@ public class CronJobManagerTest extends EasyMockTest {
delayExecutor.execute(capture(delayLaunchCapture));
// The cron manager will then try to initiate the kill.
- expectJobFetch().times(2);
+ expectJobFetch();
+ expectJobFetch();
scheduler.killTasks((Query.Builder) anyObject(), eq(CronJobManager.CRON_USER));
expectLastCall().times(3);
@@ -271,7 +313,7 @@ public class CronJobManagerTest extends EasyMockTest {
IJobConfiguration.build(job.newBuilder().setCronSchedule("1 2 3 4 5")));
expectJobAccepted();
- cronScheduler.deschedule("key");
+ cronScheduler.deschedule(DEFAULT_JOB_KEY);
expectJobAccepted(updated.getJobConfig());
control.replay();
@@ -286,7 +328,7 @@ public class CronJobManagerTest extends EasyMockTest {
IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
expectJobAccepted();
- cronScheduler.deschedule("key");
+ cronScheduler.deschedule(DEFAULT_JOB_KEY);
expectJobAccepted(updated);
control.replay();
@@ -305,6 +347,111 @@ public class CronJobManagerTest extends EasyMockTest {
cron.updateJob(new SanitizedConfiguration(updated));
}
+ @Test
+ public void testInvalidStoredJob() throws Exception {
+ // Invalid jobs are left alone, but doesn't halt operation.
+
+ cronScheduler.start();
+ shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
+
+ IJobConfiguration jobA =
+ IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5 6 7"));
+ IJobConfiguration jobB =
+ IJobConfiguration.build(makeJob().newBuilder().setCronSchedule(null));
+
+ expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY)).andReturn(ImmutableList.of(jobA, jobB));
+ expect(cronScheduler.isValidSchedule(jobA.getCronSchedule())).andReturn(false);
+
+ control.replay();
+
+ cron.schedulerActive(new SchedulerActive());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testJobStoredTwice() throws Exception {
+ // Simulate an inconsistent storage that contains two cron jobs under the same key.
+
+ cronScheduler.start();
+ shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
+
+ IJobConfiguration jobA =
+ IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
+ IJobConfiguration jobB =
+ IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("* * * * *"));
+ expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
+ .andReturn(ImmutableList.of(jobA, jobB));
+ expectJobValidated(jobA);
+ expect(cronScheduler.schedule(eq(jobA.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+ .andReturn("keyA");
+ expectJobValidated(jobB);
+
+ control.replay();
+
+ cron.schedulerActive(new SchedulerActive());
+ }
+
+ @Test(expected = ScheduleException.class)
+ public void testInvalidCronSchedule() throws Exception {
+ expect(cronScheduler.isValidSchedule(job.getCronSchedule())).andReturn(false);
+
+ control.replay();
+
+ cron.receiveJob(sanitizedConfiguration);
+ }
+
+ @Test
+ public void testKillExistingCollisionFailedKill() throws Exception {
+ IJobConfiguration killExisting = IJobConfiguration.build(
+ job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.KILL_EXISTING));
+ Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
+ expectActiveTaskFetch(TASK);
+ scheduler.killTasks(Query.jobScoped(killExisting.getKey()).active(), CRON_USER);
+ expectLastCall().andThrow(new ScheduleException("injected"));
+
+ control.replay();
+
+ cron.receiveJob(new SanitizedConfiguration(killExisting));
+ jobTriggerCapture.getValue().run();
+ }
+
+ @Test
+ public void testCancelNewCollision() throws Exception {
+ IJobConfiguration killExisting = IJobConfiguration.build(
+ job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.CANCEL_NEW));
+ Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
+ expectActiveTaskFetch(TASK);
+
+ control.replay();
+
+ cron.receiveJob(new SanitizedConfiguration(killExisting));
+ jobTriggerCapture.getValue().run();
+ }
+
+ @Test
+ public void testRunOverlapCollision() throws Exception {
+ IJobConfiguration killExisting = IJobConfiguration.build(
+ job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP));
+ Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
+ expectActiveTaskFetch(TASK);
+
+ control.replay();
+
+ cron.receiveJob(new SanitizedConfiguration(killExisting));
+ jobTriggerCapture.getValue().run();
+ }
+
+ @Test(expected = ScheduleException.class)
+ public void testScheduleFails() throws Exception {
+ expectJobValidated(job);
+ storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, sanitizedConfiguration.getJobConfig());
+ expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
+ .andThrow(new CronException("injected"));
+
+ control.replay();
+
+ cron.receiveJob(sanitizedConfiguration);
+ }
+
private IJobConfiguration makeJob() {
return IJobConfiguration.build(new JobConfiguration()
.setOwner(new Identity(OWNER, OWNER))