You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/05/15 04:34:42 UTC
[2/5] CronScheduler based on Quartz
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
new file mode 100644
index 0000000..e7d1c14
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -0,0 +1,174 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffHelper;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.JobExecutionException;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AuroraCronJobTest extends EasyMockTest {
+ public static final String TASK_ID = "A";
+ private Storage storage;
+ private StateManager stateManager;
+ private CronJobManager cronJobManager;
+ private BackoffHelper backoffHelper;
+
+ private AuroraCronJob auroraCronJob;
+
+ private static final String MANAGER_ID = "MANAGER_ID";
+
+ @Before
+ public void setUp() {
+ storage = MemStorage.newEmptyStorage();
+ stateManager = createMock(StateManager.class);
+ cronJobManager = createMock(CronJobManager.class);
+ backoffHelper = createMock(BackoffHelper.class);
+
+ auroraCronJob = new AuroraCronJob(
+ new AuroraCronJob.Config(backoffHelper), storage, stateManager, cronJobManager);
+ expect(cronJobManager.getManagerKey()).andStubReturn(MANAGER_ID);
+ }
+
+ @Test
+ public void testExecuteNonexistentIsNoop() throws JobExecutionException {
+ control.replay();
+
+ auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ }
+
+ @Test
+ public void testInvalidConfigIsNoop() throws JobExecutionException {
+ control.replay();
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ storeProvider.getJobStore().saveAcceptedJob(
+ MANAGER_ID,
+ IJobConfiguration.build(QuartzTestUtil.JOB.newBuilder().setCronSchedule(null)));
+ }
+ });
+
+ auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ }
+
+ @Test
+ public void testEmptyStorage() throws JobExecutionException {
+ stateManager.insertPendingTasks(EasyMock.<Map<Integer, ITaskConfig>>anyObject());
+ expectLastCall().times(3);
+
+ control.replay();
+ populateStorage(CronCollisionPolicy.CANCEL_NEW);
+ auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ storage = MemStorage.newEmptyStorage();
+
+ populateStorage(CronCollisionPolicy.KILL_EXISTING);
+ auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ storage = MemStorage.newEmptyStorage();
+
+ populateStorage(CronCollisionPolicy.RUN_OVERLAP);
+ auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ }
+
+ @Test
+ public void testCancelNew() throws JobExecutionException {
+ control.replay();
+
+ populateTaskStore();
+ populateStorage(CronCollisionPolicy.CANCEL_NEW);
+ auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ }
+
+ @Test
+ public void testKillExisting() throws Exception {
+ Capture<Supplier<Boolean>> capture = createCapture();
+
+ expect(stateManager.changeState(
+ TASK_ID,
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.KILLING,
+ AuroraCronJob.KILL_AUDIT_MESSAGE))
+ .andReturn(true);
+ backoffHelper.doUntilSuccess(EasyMock.capture(capture));
+ stateManager.insertPendingTasks(EasyMock.<Map<Integer, ITaskConfig>>anyObject());
+
+ control.replay();
+
+ populateStorage(CronCollisionPolicy.KILL_EXISTING);
+ populateTaskStore();
+ auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ assertFalse(capture.getValue().get());
+ storage.write(
+ new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().deleteAllTasks();
+ }
+ });
+ assertTrue(capture.getValue().get());
+ }
+
+ private void populateTaskStore() {
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) {
+ storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
+ IScheduledTask.build(new ScheduledTask()
+ .setStatus(ScheduleStatus.RUNNING)
+ .setAssignedTask(new AssignedTask()
+ .setTaskId(TASK_ID)
+ .setTask(QuartzTestUtil.JOB.getTaskConfig().newBuilder())))
+ ));
+ }
+ });
+ }
+
+ private void populateStorage(final CronCollisionPolicy policy) {
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ public void execute(Storage.MutableStoreProvider storeProvider) {
+ storeProvider.getJobStore().saveAcceptedJob(
+ MANAGER_ID,
+ QuartzTestUtil.makeSanitizedCronJob(policy).getSanitizedConfig().getJobConfig());
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
new file mode 100644
index 0000000..21e8278
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -0,0 +1,257 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.util.concurrent.Service;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import com.google.inject.util.Modules;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.state.PubsubTestUtil;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.easymock.Capture;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.JobExecutionContext;
+import org.quartz.Scheduler;
+import org.quartz.Trigger;
+import org.quartz.TriggerListener;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CronIT extends EasyMockTest {
+ public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("* * * * *");
+
+ private static final IJobKey JOB_KEY = JobKeys.from("roll", "b", "c");
+ private static final Identity IDENTITY = new Identity()
+ .setRole(JOB_KEY.getRole())
+ .setUser("user");
+
+ private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(
+ new JobConfiguration()
+ .setCronSchedule(CRONTAB_ENTRY.toString())
+ .setKey(JOB_KEY.newBuilder())
+ .setInstanceCount(2)
+ .setOwner(IDENTITY)
+ .setTaskConfig(new TaskConfig()
+ .setJobName(JOB_KEY.getName())
+ .setEnvironment(JOB_KEY.getEnvironment())
+ .setOwner(IDENTITY)
+ .setExecutorConfig(new ExecutorConfig()
+ .setName("cmd.exe")
+ .setData("echo hello world"))
+ .setNumCpus(7)
+ .setRamMb(8)
+ .setDiskMb(9))
+ );
+
+ private ShutdownRegistry shutdownRegistry;
+ private EventSink eventSink;
+ private Injector injector;
+ private StateManager stateManager;
+ private Storage storage;
+ private AuroraCronJob auroraCronJob;
+
+ private Capture<ExceptionalCommand<?>> shutdown;
+
+ @Before
+ public void setUp() throws Exception {
+ shutdownRegistry = createMock(ShutdownRegistry.class);
+ stateManager = createMock(StateManager.class);
+ storage = MemStorage.newEmptyStorage();
+ auroraCronJob = createMock(AuroraCronJob.class);
+
+ injector = Guice.createInjector(
+ // Override to verify that Guice is actually used for construction of the AuroraCronJob.
+ // TODO(ksweeney): Use the production class here.
+ Modules.override(new CronModule()).with(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(AuroraCronJob.class).toInstance(auroraCronJob);
+ }
+ }), new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+ bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+ bind(StateManager.class).toInstance(stateManager);
+ bind(Storage.class).toInstance(storage);
+
+ PubsubTestUtil.installPubsub(binder());
+ }
+ });
+ eventSink = PubsubTestUtil.startPubsub(injector);
+
+ shutdown = createCapture();
+ shutdownRegistry.addAction(capture(shutdown));
+ }
+
+ private void boot() {
+ eventSink.post(new PubsubEvent.SchedulerActive());
+ }
+
+ @Test
+ public void testCronSchedulerLifecycle() throws Exception {
+ control.replay();
+
+ Scheduler scheduler = injector.getInstance(Scheduler.class);
+ assertTrue(!scheduler.isStarted());
+
+ boot();
+ Service cronLifecycle = injector.getInstance(CronLifecycle.class);
+
+ assertTrue(cronLifecycle.isRunning());
+ assertTrue(scheduler.isStarted());
+
+ shutdown.getValue().execute();
+
+ assertTrue(!cronLifecycle.isRunning());
+ assertTrue(scheduler.isShutdown());
+ }
+
+ @Test
+ public void testJobsAreScheduled() throws Exception {
+ auroraCronJob.execute(isA(JobExecutionContext.class));
+
+ control.replay();
+ final CronJobManager cronJobManager = injector.getInstance(CronJobManager.class);
+ final Scheduler scheduler = injector.getInstance(Scheduler.class);
+
+ storage.write(new Storage.MutateWork.NoResult.Quiet() {
+ @Override
+ public void execute(Storage.MutableStoreProvider storeProvider) {
+ storeProvider.getJobStore().saveAcceptedJob(
+ cronJobManager.getManagerKey(),
+ CRON_JOB);
+ }
+ });
+
+ final CountDownLatch cronRan = new CountDownLatch(1);
+ scheduler.getListenerManager().addTriggerListener(new CountDownWhenComplete(cronRan));
+ boot();
+
+ cronRan.await();
+
+ shutdown.getValue().execute();
+ }
+
+ @Test
+ public void testKillExistingDogpiles() throws Exception {
+ // Test that a trigger for a job that hasn't finished running is ignored.
+ final CronJobManager cronJobManager = injector.getInstance(CronJobManager.class);
+
+ final CountDownLatch firstExecutionTriggered = new CountDownLatch(1);
+ final CountDownLatch firstExecutionCompleted = new CountDownLatch(1);
+ final CountDownLatch secondExecutionTriggered = new CountDownLatch(1);
+ final CountDownLatch secondExecutionCompleted = new CountDownLatch(1);
+
+ auroraCronJob.execute(isA(JobExecutionContext.class));
+ expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ firstExecutionTriggered.countDown();
+ firstExecutionCompleted.await();
+ return null;
+ }
+ });
+ auroraCronJob.execute(isA(JobExecutionContext.class));
+ expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Void answer() throws Throwable {
+ secondExecutionTriggered.countDown();
+ secondExecutionCompleted.await();
+ return null;
+ }
+ });
+
+ control.replay();
+
+ boot();
+
+ cronJobManager.createJob(SanitizedCronJob.fromUnsanitized(CRON_JOB));
+ cronJobManager.startJobNow(JOB_KEY);
+ firstExecutionTriggered.await();
+ cronJobManager.startJobNow(JOB_KEY);
+ assertEquals(1, secondExecutionTriggered.getCount());
+ firstExecutionCompleted.countDown();
+ secondExecutionTriggered.await();
+ secondExecutionTriggered.countDown();
+ }
+
+ private static class CountDownWhenComplete implements TriggerListener {
+ private final CountDownLatch countDownLatch;
+
+ CountDownWhenComplete(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public String getName() {
+ return CountDownWhenComplete.class.getName();
+ }
+
+ @Override
+ public void triggerFired(Trigger trigger, JobExecutionContext context) {
+ }
+
+ @Override
+ public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
+ return false;
+ }
+
+ @Override
+ public void triggerMisfired(Trigger trigger) {
+ // No-op.
+ }
+
+ @Override
+ public void triggerComplete(
+ Trigger trigger,
+ JobExecutionContext context,
+ Trigger.CompletedExecutionInstruction triggerInstructionCode) {
+
+ countDownLatch.countDown();
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
new file mode 100644
index 0000000..b9116a4
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
@@ -0,0 +1,221 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TimeZone;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.Trigger;
+import org.quartz.impl.matchers.GroupMatcher;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CronJobManagerImplTest extends EasyMockTest {
+ private Storage storage;
+ private Scheduler scheduler;
+
+ private CronJobManager cronJobManager;
+
+ @Before
+ public void setUp() {
+ storage = MemStorage.newEmptyStorage();
+ scheduler = createMock(Scheduler.class);
+
+ cronJobManager = new CronJobManagerImpl(storage, scheduler, TimeZone.getTimeZone("GMT"));
+ }
+
+ @Test
+ public void testStartJobNowExistent() throws Exception {
+ populateStorage();
+ scheduler.triggerJob(QuartzTestUtil.QUARTZ_JOB_KEY);
+
+ control.replay();
+
+ cronJobManager.startJobNow(QuartzTestUtil.AURORA_JOB_KEY);
+ }
+
+ @Test(expected = CronException.class)
+ public void testStartJobNowNonexistent() throws Exception {
+ control.replay();
+
+ cronJobManager.startJobNow(QuartzTestUtil.AURORA_JOB_KEY);
+ }
+
+ @Test
+ public void testUpdateExistingJob() throws Exception {
+ SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
+
+ expect(scheduler.deleteJob(QuartzTestUtil.QUARTZ_JOB_KEY)).andReturn(true);
+ expect(scheduler.scheduleJob(anyObject(JobDetail.class), anyObject(Trigger.class)))
+ .andReturn(null);
+
+ populateStorage();
+
+ control.replay();
+
+ cronJobManager.updateJob(sanitizedCronJob);
+ assertEquals(sanitizedCronJob.getSanitizedConfig().getJobConfig(), fetchFromStorage().orNull());
+ }
+
+ @Test
+ public void testUpdateNonexistentJob() throws Exception {
+ control.replay();
+
+ try {
+ cronJobManager.updateJob(QuartzTestUtil.makeUpdatedJob());
+ fail();
+ } catch (CronException e) {
+ // Expected.
+ }
+
+ assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
+ }
+
+ @Test
+ public void testCreateNonexistentJob() throws Exception {
+ SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
+
+ expect(scheduler.scheduleJob(anyObject(JobDetail.class), anyObject(Trigger.class)))
+ .andReturn(null);
+
+ control.replay();
+
+ cronJobManager.createJob(sanitizedCronJob);
+
+ assertEquals(
+ sanitizedCronJob.getSanitizedConfig().getJobConfig(),
+ fetchFromStorage().orNull());
+ }
+
+ @Test(expected = CronException.class)
+ public void testCreateExistingJobFails() throws Exception {
+ SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
+ populateStorage();
+ control.replay();
+
+ cronJobManager.createJob(sanitizedCronJob);
+ }
+
+ @Test
+ public void testGetJobs() throws Exception {
+ control.replay();
+ assertEquals(Collections.emptyList(), ImmutableList.copyOf(cronJobManager.getJobs()));
+
+ populateStorage();
+ assertEquals(
+ QuartzTestUtil.makeSanitizedCronJob().getSanitizedConfig().getJobConfig(),
+ Iterables.getOnlyElement(cronJobManager.getJobs()));
+ }
+
+ @Test
+ public void testNoRunOverlap() throws Exception {
+ SanitizedCronJob runOverlapJob = SanitizedCronJob.fromUnsanitized(
+ IJobConfiguration.build(QuartzTestUtil.JOB.newBuilder()
+ .setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP)));
+
+ control.replay();
+
+ try {
+ cronJobManager.createJob(runOverlapJob);
+ fail();
+ } catch (CronException e) {
+ // Expected.
+ }
+
+ try {
+ cronJobManager.updateJob(runOverlapJob);
+ } catch (CronException e) {
+ // Expected.
+ }
+
+ assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
+ }
+
+ @Test
+ public void testDeleteJob() throws Exception {
+ expect(scheduler.deleteJob(QuartzTestUtil.QUARTZ_JOB_KEY)).andReturn(true);
+
+ control.replay();
+
+ assertFalse(cronJobManager.deleteJob(QuartzTestUtil.AURORA_JOB_KEY));
+ populateStorage();
+ assertTrue(cronJobManager.deleteJob(QuartzTestUtil.AURORA_JOB_KEY));
+ assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
+ }
+
+ @Test
+ public void testGetScheduledJobs() throws Exception {
+ JobDetail jobDetail = createMock(JobDetail.class);
+ expect(scheduler.getJobKeys(EasyMock.<GroupMatcher<JobKey>>anyObject()))
+ .andReturn(ImmutableSet.of(QuartzTestUtil.QUARTZ_JOB_KEY));
+ expect(scheduler.getJobDetail(QuartzTestUtil.QUARTZ_JOB_KEY))
+ .andReturn(jobDetail);
+ expect(jobDetail.getDescription()).andReturn("* * * * *");
+
+ control.replay();
+
+ Map<IJobKey, CrontabEntry> scheduledJobs = cronJobManager.getScheduledJobs();
+ assertEquals(CrontabEntry.parse("* * * * *"), scheduledJobs.get(QuartzTestUtil.AURORA_JOB_KEY));
+ }
+
+ private void populateStorage() throws Exception {
+ storage.write(new Storage.MutateWork.NoResult<Exception>() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) throws Exception {
+ storeProvider.getJobStore().saveAcceptedJob(
+ cronJobManager.getManagerKey(),
+ QuartzTestUtil.makeSanitizedCronJob().getSanitizedConfig().getJobConfig());
+ }
+ });
+ }
+
+ private Optional<IJobConfiguration> fetchFromStorage() {
+ return storage.consistentRead(new Storage.Work.Quiet<Optional<IJobConfiguration>>() {
+ @Override
+ public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
+ return storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(),
+ QuartzTestUtil.AURORA_JOB_KEY);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
new file mode 100644
index 0000000..6bc97f3
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
@@ -0,0 +1,89 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+
+import org.apache.aurora.scheduler.cron.ExpectedPrediction;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CronPredictorImplTest {
+ private static final TimeZone TIME_ZONE = TimeZone.getTimeZone("GMT");
+ public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("* * * * *");
+
+ private CronPredictor cronPredictor;
+
+ private FakeClock clock;
+
+ @Before
+ public void setUp() {
+ clock = new FakeClock();
+ cronPredictor = new CronPredictorImpl(clock, TIME_ZONE);
+ }
+
+ @Test
+ public void testValidSchedule() {
+ clock.advance(Amount.of(1L, Time.DAYS));
+ Date expectedPrediction = new Date(Amount.of(1L, Time.DAYS).as(Time.MILLISECONDS)
+ + Amount.of(1L, Time.MINUTES).as(Time.MILLISECONDS));
+ assertEquals(expectedPrediction, cronPredictor.predictNextRun(CrontabEntry.parse("* * * * *")));
+ }
+
+ @Test
+ public void testCronExpressions() {
+ assertEquals("0 * * ? * 1,2,3,4,5,6,7",
+ Quartz.cronExpression(CRONTAB_ENTRY, TIME_ZONE).getCronExpression());
+ }
+
+ @Test
+ public void testCronPredictorConforms() throws Exception {
+ for (ExpectedPrediction expectedPrediction : ExpectedPrediction.getAll()) {
+ List<Date> results = Lists.newArrayList();
+ clock.setNowMillis(0);
+ for (int i = 0; i < expectedPrediction.getTriggerTimes().size(); i++) {
+ Date nextTriggerTime = cronPredictor.predictNextRun(expectedPrediction.parseCrontabEntry());
+ results.add(nextTriggerTime);
+ clock.setNowMillis(nextTriggerTime.getTime());
+ }
+ assertEquals(
+ "Cron schedule " + expectedPrediction.getSchedule() + " made unexpected predictions.",
+ Lists.transform(
+ expectedPrediction.getTriggerTimes(),
+ new Function<Long, Date>() {
+ @Override
+ public Date apply(Long time) {
+ return new Date(time);
+ }
+ }
+ ),
+ results);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
new file mode 100644
index 0000000..b10f514
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import com.google.common.base.Throwables;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.quartz.JobKey;
+
+/**
+ * Fixtures used across quartz tests.
+ */
+final class QuartzTestUtil {
+ static final IJobKey AURORA_JOB_KEY = JobKeys.from("role", "env", "job");
+ static final IJobConfiguration JOB = IJobConfiguration.build(
+ new JobConfiguration()
+ .setCronSchedule("* * * * SUN")
+ .setInstanceCount(10)
+ .setOwner(new Identity("role", "user"))
+ .setKey(AURORA_JOB_KEY.newBuilder())
+ .setTaskConfig(new TaskConfig()
+ .setOwner(new Identity("role", "user"))
+ .setJobName(AURORA_JOB_KEY.getName())
+ .setEnvironment(AURORA_JOB_KEY.getEnvironment())
+ .setDiskMb(3)
+ .setRamMb(4)
+ .setNumCpus(5)
+ .setExecutorConfig(new ExecutorConfig()
+ .setName("cmd.exe")
+ .setData("echo hello world")))
+ );
+ static final JobKey QUARTZ_JOB_KEY = Quartz.jobKey(AURORA_JOB_KEY);
+
+ private QuartzTestUtil() {
+ // Utility class.
+ }
+
+ static SanitizedCronJob makeSanitizedCronJob(CronCollisionPolicy collisionPolicy) {
+ try {
+ return SanitizedCronJob.fromUnsanitized(
+ IJobConfiguration.build(JOB.newBuilder().setCronCollisionPolicy(collisionPolicy)));
+ } catch (CronException | ConfigurationManager.TaskDescriptionException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ static SanitizedCronJob makeSanitizedCronJob() {
+ return makeSanitizedCronJob(CronCollisionPolicy.KILL_EXISTING);
+ }
+
+ static SanitizedCronJob makeUpdatedJob() throws Exception {
+ return SanitizedCronJob.fromUnsanitized(
+ IJobConfiguration.build(JOB.newBuilder().setCronSchedule("* * 1 * *")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index da6c0ff..0e17f49 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -38,15 +38,12 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.twitter.common.application.ShutdownRegistry;
import com.twitter.common.collections.Pair;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Constraint;
-import org.apache.aurora.gen.CronCollisionPolicy;
import org.apache.aurora.gen.ExecutorConfig;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.JobConfiguration;
@@ -68,7 +65,10 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.quota.QuotaCheckResult;
@@ -85,6 +85,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
import org.apache.mesos.Protos.SlaveID;
import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
@@ -106,9 +107,9 @@ import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFIC
import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.reportMatcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -128,25 +129,23 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private static final IJobKey KEY_A = JobKeys.from(ROLE_A, ENV_A, JOB_A);
private static final int ONE_GB = 1024;
- private static final String ROLE_B = "Test_Role_B";
- private static final IJobKey KEY_B = JobKeys.from(ROLE_B, ENV_A, JOB_A);
-
private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("SlaveId").build();
private static final String SLAVE_HOST_1 = "SlaveHost1";
private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
+ public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("1 1 1 1 *");
+ public static final String RAW_CRONTAB_ENTRY = CRONTAB_ENTRY.toString();
+
private Driver driver;
private StateManagerImpl stateManager;
private Storage storage;
private SchedulerCoreImpl scheduler;
- private CronScheduler cronScheduler;
- private CronJobManager cron;
+ private CronJobManager cronJobManager;
private FakeClock clock;
private EventSink eventSink;
private RescheduleCalculator rescheduleCalculator;
- private ShutdownRegistry shutdownRegistry;
private QuotaManager quotaManager;
// TODO(William Farner): Set up explicit expectations for calls to generate task IDs.
@@ -164,18 +163,15 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
clock = new FakeClock();
eventSink = createMock(EventSink.class);
rescheduleCalculator = createMock(RescheduleCalculator.class);
- cronScheduler = createMock(CronScheduler.class);
- shutdownRegistry = createMock(ShutdownRegistry.class);
+ cronJobManager = createMock(CronJobManager.class);
quotaManager = createMock(QuotaManager.class);
eventSink.post(EasyMock.<PubsubEvent>anyObject());
expectLastCall().anyTimes();
- expect(cronScheduler.schedule(anyObject(String.class), anyObject(Runnable.class)))
- .andStubReturn("key");
- expect(cronScheduler.isValidSchedule(anyObject(String.class))).andStubReturn(true);
expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
.andStubReturn(ENOUGH_QUOTA);
+ expect(cronJobManager.hasJob(anyObject(IJobKey.class))).andStubReturn(false);
}
/**
@@ -208,15 +204,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
taskIdGenerator,
eventSink,
rescheduleCalculator);
- cron = new CronJobManager(
- stateManager,
- storage,
- cronScheduler,
- shutdownRegistry,
- MoreExecutors.sameThreadExecutor());
scheduler = new SchedulerCoreImpl(
storage,
- cron,
+ cronJobManager,
stateManager,
taskIdGenerator,
quotaManager);
@@ -250,6 +240,19 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet());
}
+ @Test
+ public void testCreateJobEmptyString() throws Exception {
+ // TODO(ksweeney): Deprecate this as part of AURORA-423.
+
+ control.replay();
+ buildScheduler();
+
+ SanitizedConfiguration job = SanitizedConfiguration.fromUnsanitized(
+ IJobConfiguration.build(makeJob(KEY_A, 1).getJobConfig().newBuilder().setCronSchedule("")));
+ scheduler.createJob(job);
+ assertTaskCount(1);
+ }
+
private static Constraint dedicatedConstraint(Set<String> values) {
return new Constraint(DEDICATED_ATTRIBUTE,
TaskConstraint.value(new ValueConstraint(false, values)));
@@ -272,7 +275,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
buildScheduler();
TaskConfig newTask = nonProductionTask();
- newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(JobKeys.toPath(KEY_A))));
+ newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(JobKeys.canonicalString(KEY_A))));
scheduler.createJob(makeJob(KEY_A, newTask));
assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
}
@@ -445,147 +448,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.createJob(makeJob(KEY_A, 1));
}
- @Test(expected = ScheduleException.class)
- public void testCreateDuplicateCronJob() throws Exception {
- SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
-
- control.replay();
- buildScheduler();
-
- // Cron jobs are scheduled on a delay, so this job's tasks will not be scheduled immediately,
- // but duplicate jobs should still be rejected.
- scheduler.createJob(sanitizedConfiguration);
- assertTaskCount(0);
-
- scheduler.createJob(makeJob(KEY_A, 1));
- }
-
- @Test
- public void testStartCronJob() throws Exception {
- // Create a cron job, ask the scheduler to start it, and ensure that the tasks exist
- // in the PENDING state.
-
- SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
- IJobKey jobKey = sanitizedConfiguration.getJobConfig().getKey();
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(sanitizedConfiguration);
- assertTaskCount(0);
-
- cron.startJobNow(jobKey);
- assertEquals(PENDING, getOnlyTask(Query.jobScoped(jobKey)).getStatus());
- }
-
- @Test(expected = ScheduleException.class)
- public void testStartNonexistentCronJob() throws Exception {
- // Try to start a cron job that doesn't exist.
- control.replay();
- buildScheduler();
-
- cron.startJobNow(KEY_A);
- }
-
- @Test
- public void testStartNonCronJob() throws Exception {
- // Create a NON cron job and try to start it as though it were a cron job, and ensure that
- // no cron tasks are created.
- control.replay();
- buildScheduler();
-
- scheduler.createJob(makeJob(KEY_A, 1));
- String taskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
-
- try {
- cron.startJobNow(KEY_A);
- fail("Start should have failed.");
- } catch (ScheduleException e) {
- // Expected.
- }
-
- assertEquals(PENDING, getTask(taskId).getStatus());
- assertFalse(cron.hasJob(KEY_A));
- }
-
- @Test(expected = ScheduleException.class)
- public void testStartNonOwnedCronJob() throws Exception {
- // Try to start a cron job that is not owned by us.
- // Should throw an exception.
-
- SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
- IJobConfiguration job = sanitizedConfiguration.getJobConfig();
- expect(cronScheduler.isValidSchedule(job.getCronSchedule())).andReturn(true);
- expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andReturn("key");
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(sanitizedConfiguration);
- assertTaskCount(0);
-
- cron.startJobNow(KEY_B);
- }
-
- @Test
- public void testStartRunningCronJob() throws Exception {
- // Start a cron job that is already started by an earlier
- // call and is PENDING. Make sure it follows the cron collision policy.
- SanitizedConfiguration sanitizedConfiguration =
- makeCronJob(KEY_A, 1, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
- expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
- EasyMock.<Runnable>anyObject()))
- .andReturn("key");
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(sanitizedConfiguration);
- assertTaskCount(0);
- assertTrue(cron.hasJob(KEY_A));
-
- cron.startJobNow(KEY_A);
- assertTaskCount(1);
-
- String taskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
-
- // Now start the same cron job immediately.
- cron.startJobNow(KEY_A);
- assertTaskCount(1);
- assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
-
- // Make sure the pending job is the new one.
- String newTaskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
- assertFalse(taskId.equals(newTaskId));
- }
-
- @Test
- public void testKillCreateCronJob() throws Exception {
- SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
- IJobConfiguration job = sanitizedConfiguration.getJobConfig();
- expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andReturn("key");
- cronScheduler.deschedule("key");
-
- SanitizedConfiguration updated = makeCronJob(KEY_A, 1, "1 2 3 4 5");
- IJobConfiguration updatedJob = updated.getJobConfig();
- expect(cronScheduler.schedule(eq(updatedJob.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andReturn("key2");
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(sanitizedConfiguration);
- assertTrue(cron.hasJob(KEY_A));
-
- scheduler.killTasks(Query.jobScoped(KEY_A), OWNER_A.getUser());
- scheduler.createJob(updated);
-
- IJobConfiguration stored = Iterables.getOnlyElement(cron.getJobs());
- assertEquals(updatedJob.getCronSchedule(), stored.getCronSchedule());
- }
-
@Test
public void testKillTask() throws Exception {
driver.killTask(EasyMock.<String>anyObject());
@@ -638,17 +500,21 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testServiceTasksRescheduled() throws Exception {
int numServiceTasks = 5;
+ IJobKey adhocKey = KEY_A;
+ IJobKey serviceKey = IJobKey.build(
+ adhocKey.newBuilder().setName(adhocKey.getName() + "service"));
expectTaskNotThrottled().times(numServiceTasks);
+ expectNoCronJob(adhocKey);
+ expectNoCronJob(serviceKey);
control.replay();
buildScheduler();
// Schedule 5 service and 5 non-service tasks.
- scheduler.createJob(makeJob(KEY_A, numServiceTasks));
+ scheduler.createJob(makeJob(adhocKey, numServiceTasks));
TaskConfig task = productionTask().setIsService(true);
- scheduler.createJob(
- makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
+ scheduler.createJob(makeJob(serviceKey, task, 5));
assertEquals(10, getTasksByStatus(PENDING).size());
changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
@@ -675,6 +541,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
int totalFailures = 10;
expectTaskNotThrottled().times(totalFailures);
+ expectNoCronJob(KEY_A);
control.replay();
buildScheduler();
@@ -733,6 +600,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testNoTransitionFromTerminalState() throws Exception {
expectKillTask(1);
+ expectNoCronJob(KEY_A);
control.replay();
buildScheduler();
@@ -753,6 +621,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
public void testFailedTaskIncrementsFailureCount() throws Exception {
int maxFailures = 5;
expectTaskNotThrottled().times(maxFailures - 1);
+ expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
control.replay();
buildScheduler();
@@ -785,78 +654,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
}
@Test
- public void testCronJobLifeCycle() throws Exception {
- SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 10, "1 1 1 1 1");
- IJobConfiguration job = sanitizedConfiguration.getJobConfig();
- expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andReturn("key");
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(sanitizedConfiguration);
- assertTaskCount(0);
- assertTrue(cron.hasJob(KEY_A));
-
- // Simulate a triggering of the cron job.
- cron.startJobNow(KEY_A);
- assertTaskCount(10);
- assertEquals(10,
- getTasks(Query.jobScoped(KEY_A).byStatus(PENDING)).size());
-
- assertTaskCount(10);
-
- changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
- assertTaskCount(10);
- changeStatus(Query.roleScoped(ROLE_A), RUNNING);
- assertTaskCount(10);
- changeStatus(Query.roleScoped(ROLE_A), FINISHED);
- }
-
- @Test
- public void testCronNoSuicide() throws Exception {
- SanitizedConfiguration sanitizedConfiguration =
- makeCronJob(KEY_A, 10, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
- expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
- EasyMock.<Runnable>anyObject()))
- .andReturn("key");
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(sanitizedConfiguration);
- assertTaskCount(0);
-
- try {
- scheduler.createJob(sanitizedConfiguration);
- fail();
- } catch (ScheduleException e) {
- // Expected.
- }
- assertTrue(cron.hasJob(KEY_A));
-
- // Simulate a triggering of the cron job.
- cron.startJobNow(KEY_A);
- assertTaskCount(10);
-
- Set<String> taskIds = Tasks.ids(getTasksOwnedBy(OWNER_A));
-
- // Simulate a triggering of the cron job.
- cron.startJobNow(KEY_A);
- assertTaskCount(10);
- assertTrue(Sets.intersection(taskIds, Tasks.ids(getTasksOwnedBy(OWNER_A))).isEmpty());
-
- try {
- scheduler.createJob(sanitizedConfiguration);
- fail();
- } catch (ScheduleException e) {
- // Expected.
- }
- assertTrue(cron.hasJob(KEY_A));
- }
-
- @Test
public void testKillPendingTask() throws Exception {
+ expectNoCronJob(KEY_A);
+
control.replay();
buildScheduler();
@@ -891,16 +691,12 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testKillCronTask() throws Exception {
- SanitizedConfiguration sanitizedConfiguration =
- makeCronJob(KEY_A, 1, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
- expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
- EasyMock.<Runnable>anyObject()))
- .andReturn("key");
- cronScheduler.deschedule("key");
-
+ expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
+ cronJobManager.createJob(anyObject(SanitizedCronJob.class));
+ expect(cronJobManager.deleteJob(KEY_A)).andReturn(true);
control.replay();
buildScheduler();
- scheduler.createJob(makeCronJob(KEY_A, 1, "1 1 1 1 1"));
+ scheduler.createJob(makeCronJob(KEY_A, 1, RAW_CRONTAB_ENTRY));
// This will fail if the cron task could not be found.
scheduler.killTasks(Query.jobScoped(KEY_A), OWNER_A.getUser());
@@ -910,6 +706,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
public void testLostTaskRescheduled() throws Exception {
expectKillTask(2);
expectTaskNotThrottled().times(2);
+ expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
control.replay();
buildScheduler();
@@ -939,32 +736,10 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
}
@Test
- public void testKillNotStrictlyJobScoped() throws Exception {
- // Makes sure that queries that are not strictly job scoped will not remove the job entirely.
- SanitizedConfiguration config = makeCronJob(KEY_A, 10, "1 1 1 1 1");
- IJobConfiguration job = config.getJobConfig();
- expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andReturn("key");
- cronScheduler.deschedule("key");
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(config);
- assertTrue(cron.hasJob(KEY_A));
- cron.startJobNow(KEY_A);
- assertTaskCount(10);
-
- scheduler.killTasks(Query.instanceScoped(KEY_A, 0), USER_A);
- assertTaskCount(9);
- assertTrue(cron.hasJob(KEY_A));
-
- scheduler.killTasks(Query.jobScoped(KEY_A), USER_A);
- assertFalse(cron.hasJob(KEY_A));
- }
-
- @Test
public void testKillJob() throws Exception {
+ expectNoCronJob(KEY_A);
+ expect(cronJobManager.deleteJob(KEY_A)).andReturn(false);
+
control.replay();
buildScheduler();
@@ -1015,6 +790,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test(expected = ScheduleException.class)
public void testRestartNonexistentShard() throws Exception {
expectTaskNotThrottled();
+ expectNoCronJob(KEY_A);
control.replay();
buildScheduler();
@@ -1026,6 +802,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
@Test
public void testRestartPendingShard() throws Exception {
+ expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
+
control.replay();
buildScheduler();
@@ -1058,6 +836,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
public void testPortResourceResetAfterReschedule() throws Exception {
expectKillTask(1);
expectTaskNotThrottled();
+ expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
control.replay();
buildScheduler();
@@ -1120,6 +899,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
}
};
+ expectNoCronJob(KEY_A);
control.replay();
buildScheduler();
@@ -1174,9 +954,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.addInstances(
job.getKey(),
- ImmutableSet.copyOf(
- ContiguousSet.create(Range.closed(0, SchedulerCoreImpl.MAX_TASKS_PER_JOB.get()),
- DiscreteDomain.integers())),
+ ContiguousSet.create(Range.closed(0, SchedulerCoreImpl.MAX_TASKS_PER_JOB.get()),
+ DiscreteDomain.integers()),
job.getTaskConfig());
}
@@ -1225,6 +1004,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
.setOwner(OWNER_A);
ImmutableSet<Integer> instances = ImmutableSet.of(0);
+ expectNoCronJob(KEY_A);
control.replay();
buildScheduler();
@@ -1235,6 +1015,10 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
scheduler.addInstances(KEY_A, instances, ITaskConfig.build(newTask));
}
+ private void expectNoCronJob(IJobKey jobKey) throws CronException {
+ expect(cronJobManager.hasJob(jobKey)).andReturn(false);
+ }
+
private static String getLocalHost() {
try {
return InetAddress.getLocalHost().getHostName();
@@ -1265,19 +1049,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
private static SanitizedConfiguration makeCronJob(
IJobKey jobKey,
int numDefaultTasks,
- String cronSchedule,
- CronCollisionPolicy policy) throws TaskDescriptionException {
-
- return new SanitizedConfiguration(IJobConfiguration.build(
- makeCronJob(jobKey, numDefaultTasks, cronSchedule)
- .getJobConfig()
- .newBuilder()
- .setCronCollisionPolicy(policy)));
- }
-
- private static SanitizedConfiguration makeCronJob(
- IJobKey jobKey,
- int numDefaultTasks,
String cronSchedule) throws TaskDescriptionException {
SanitizedConfiguration job = makeJob(jobKey, numDefaultTasks);
@@ -1401,4 +1172,23 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {
changeStatus(Query.taskScoped(taskId), status, message);
}
+
+ private SanitizedConfiguration hasJobKey(final IJobKey key) {
+ reportMatcher(new IArgumentMatcher() {
+ @Override
+ public boolean matches(Object item) {
+ if (!(item instanceof SanitizedConfiguration)) {
+ return false;
+ }
+ SanitizedConfiguration configuration = (SanitizedConfiguration) item;
+ return key.equals(configuration.getJobConfig().getKey());
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {
+ buffer.append(key.toString());
+ }
+ });
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
deleted file mode 100644
index fa9cb75..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.concurrent.Executor;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.CronCollisionPolicy;
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
-import static org.apache.aurora.scheduler.state.CronJobManager.MANAGER_KEY;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class CronJobManagerTest extends EasyMockTest {
-
- private static final String OWNER = "owner";
- private static final String ENVIRONMENT = "staging11";
- private static final String JOB_NAME = "jobName";
- private static final String DEFAULT_JOB_KEY = "key";
- private static final String TASK_ID = "id";
- private static final IScheduledTask TASK = IScheduledTask.build(
- new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(TASK_ID)));
-
- private StateManager stateManager;
- private Executor delayExecutor;
- private Capture<Runnable> delayLaunchCapture;
- private StorageTestUtil storageUtil;
-
- private CronScheduler cronScheduler;
- private ShutdownRegistry shutdownRegistry;
- private CronJobManager cron;
- private IJobConfiguration job;
- private SanitizedConfiguration sanitizedConfiguration;
-
- @Before
- public void setUp() throws Exception {
- stateManager = createMock(StateManager.class);
- delayExecutor = createMock(Executor.class);
- delayLaunchCapture = createCapture();
- storageUtil = new StorageTestUtil(this);
- storageUtil.expectOperations();
- cronScheduler = createMock(CronScheduler.class);
- shutdownRegistry = createMock(ShutdownRegistry.class);
-
- cron = new CronJobManager(
- stateManager,
- storageUtil.storage,
- cronScheduler,
- shutdownRegistry,
- delayExecutor);
- job = makeJob();
- sanitizedConfiguration = SanitizedConfiguration.fromUnsanitized(job);
- }
-
- private void expectJobValidated(IJobConfiguration savedJob) {
- expect(cronScheduler.isValidSchedule(savedJob.getCronSchedule())).andReturn(true);
- }
-
- private void expectJobValidated() {
- expectJobValidated(sanitizedConfiguration.getJobConfig());
- }
-
- private Capture<Runnable> expectJobAccepted(IJobConfiguration savedJob) throws Exception {
- expectJobValidated(savedJob);
- storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, savedJob);
- Capture<Runnable> jobTriggerCapture = createCapture();
- expect(cronScheduler.schedule(eq(savedJob.getCronSchedule()), capture(jobTriggerCapture)))
- .andReturn(DEFAULT_JOB_KEY);
- return jobTriggerCapture;
- }
-
- private void expectJobAccepted() throws Exception {
- expectJobAccepted(sanitizedConfiguration.getJobConfig());
- }
-
- private void expectJobFetch() {
- expectJobValidated();
- expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
- .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
- }
-
- private IExpectationSetters<?> expectActiveTaskFetch(IScheduledTask... activeTasks) {
- return storageUtil.expectTaskFetch(Query.jobScoped(job.getKey()).active(), activeTasks);
- }
-
- @Test
- public void testPubsubWiring() throws Exception {
- expect(cronScheduler.startAsync()).andReturn(cronScheduler);
- cronScheduler.awaitRunning();
- shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
- expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
- .andReturn(ImmutableList.<IJobConfiguration>of());
-
- control.replay();
-
- Injector injector = Guice.createInjector(new AbstractModule() {
- @Override
- protected void configure() {
- bind(StateManager.class).toInstance(stateManager);
- bind(Storage.class).toInstance(storageUtil.storage);
- bind(CronScheduler.class).toInstance(cronScheduler);
- bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
- PubsubTestUtil.installPubsub(binder());
- StateModule.bindCronJobManager(binder());
- }
- });
- cron = injector.getInstance(CronJobManager.class);
- EventSink eventSink = PubsubTestUtil.startPubsub(injector);
- eventSink.post(new SchedulerActive());
- }
-
- @Test
- public void testStart() throws Exception {
- expectJobAccepted();
- expectJobFetch();
- expectActiveTaskFetch();
-
- // Job is executed immediately since there are no existing tasks to kill.
- stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
- expect(cronScheduler.getSchedule(DEFAULT_JOB_KEY))
- .andReturn(Optional.of(job.getCronSchedule()))
- .times(2);
-
- control.replay();
-
- assertEquals(ImmutableMap.<IJobKey, String>of(), cron.getScheduledJobs());
- cron.receiveJob(sanitizedConfiguration);
- assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
- cron.startJobNow(job.getKey());
- assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
- }
-
- @Test
- public void testDeleteInconsistent() throws Exception {
- // Tests a case where a job exists in the storage, but is not registered with the cron system.
-
- expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
- .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
-
- control.replay();
-
- assertTrue(cron.deleteJob(job.getKey()));
- }
-
- private IExpectationSetters<?> expectTaskKilled(String id) {
- expect(stateManager.changeState(
- id,
- Optional.<ScheduleStatus>absent(),
- ScheduleStatus.KILLING,
- CronJobManager.KILL_AUDIT_MESSAGE))
- .andReturn(true);
- return expectLastCall();
- }
-
- @Test
- public void testDelayedStart() throws Exception {
- expectJobAccepted();
- expectJobFetch();
-
- // Query to test if live tasks exist for the job.
- expectActiveTaskFetch(TASK);
-
- // Live tasks exist, so the cron manager must delay the cron launch.
- delayExecutor.execute(capture(delayLaunchCapture));
-
- // The cron manager will then try to initiate the kill.
- expectTaskKilled(TASK_ID);
-
- // Immediate query and delayed query.
- expectActiveTaskFetch(TASK).times(2);
-
- // Simulate the live task disappearing.
- expectActiveTaskFetch();
-
- stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
-
- control.replay();
-
- cron.receiveJob(sanitizedConfiguration);
- cron.startJobNow(job.getKey());
- assertEquals(ImmutableSet.of(job.getKey()), cron.getPendingRuns());
- delayLaunchCapture.getValue().run();
- assertEquals(ImmutableSet.<IJobKey>of(), cron.getPendingRuns());
- }
-
- @Test
- public void testDelayedStartResets() throws Exception {
- expectJobAccepted();
- expectJobFetch();
-
- // Query to test if live tasks exist for the job.
- expectActiveTaskFetch(TASK);
-
- // Live tasks exist, so the cron manager must delay the cron launch.
- delayExecutor.execute(capture(delayLaunchCapture));
-
- // The cron manager will then try to initiate the kill.
- expectTaskKilled(TASK_ID);
-
- // Immediate query and delayed query.
- expectActiveTaskFetch(TASK).times(2);
-
- // Simulate the live task disappearing.
- expectActiveTaskFetch();
-
- // Round two.
- expectJobFetch();
- expectActiveTaskFetch(TASK);
- delayExecutor.execute(capture(delayLaunchCapture));
- expectTaskKilled(TASK_ID);
- expectActiveTaskFetch(TASK).times(2);
- expectActiveTaskFetch();
-
- stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
- expectLastCall().times(2);
-
- control.replay();
-
- cron.receiveJob(sanitizedConfiguration);
- cron.startJobNow(job.getKey());
- delayLaunchCapture.getValue().run();
-
- // Start the job again. Since the previous delayed start completed, this should repeat the
- // entire process.
- cron.startJobNow(job.getKey());
- delayLaunchCapture.getValue().run();
- }
-
- @Test
- public void testDelayedStartMultiple() throws Exception {
- expectJobAccepted();
- expectJobFetch();
-
- // Query to test if live tasks exist for the job.
- expectActiveTaskFetch(TASK).times(3);
-
- // Live tasks exist, so the cron manager must delay the cron launch.
- delayExecutor.execute(capture(delayLaunchCapture));
-
- // The cron manager will then try to initiate the kill.
- expectJobFetch();
- expectJobFetch();
- expectTaskKilled(TASK_ID).times(3);
-
- // Immediate queries and delayed query.
- expectActiveTaskFetch(TASK).times(4);
-
- // Simulate the live task disappearing.
- expectActiveTaskFetch();
-
- stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
-
- control.replay();
-
- cron.receiveJob(sanitizedConfiguration);
-
- // Attempt to trick the cron manager into launching multiple times, or launching multiple
- // pollers.
- cron.startJobNow(job.getKey());
- cron.startJobNow(job.getKey());
- cron.startJobNow(job.getKey());
- delayLaunchCapture.getValue().run();
- }
-
- @Test
- public void testUpdate() throws Exception {
- SanitizedConfiguration updated = new SanitizedConfiguration(
- IJobConfiguration.build(job.newBuilder().setCronSchedule("1 2 3 4 5")));
-
- expectJobAccepted();
- cronScheduler.deschedule(DEFAULT_JOB_KEY);
- expectJobAccepted(updated.getJobConfig());
-
- control.replay();
-
- cron.receiveJob(sanitizedConfiguration);
- cron.updateJob(updated);
- }
-
- @Test
- public void testConsistentState() throws Exception {
- IJobConfiguration updated =
- IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
-
- expectJobAccepted();
- cronScheduler.deschedule(DEFAULT_JOB_KEY);
- expectJobAccepted(updated);
-
- control.replay();
-
- cron.receiveJob(sanitizedConfiguration);
-
- IJobConfiguration failedUpdate =
- IJobConfiguration.build(updated.newBuilder().setCronSchedule(null));
- try {
- cron.updateJob(new SanitizedConfiguration(failedUpdate));
- fail();
- } catch (ScheduleException e) {
- // Expected.
- }
-
- cron.updateJob(new SanitizedConfiguration(updated));
- }
-
- @Test
- public void testInvalidStoredJob() throws Exception {
- // Invalid jobs are left alone, but doesn't halt operation.
-
- expect(cronScheduler.startAsync()).andReturn(cronScheduler);
- cronScheduler.awaitRunning();
- shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
-
- IJobConfiguration jobA =
- IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5 6 7"));
- IJobConfiguration jobB =
- IJobConfiguration.build(makeJob().newBuilder().setCronSchedule(null));
-
- expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY)).andReturn(ImmutableList.of(jobA, jobB));
- expect(cronScheduler.isValidSchedule(jobA.getCronSchedule())).andReturn(false);
-
- control.replay();
-
- cron.schedulerActive(new SchedulerActive());
- }
-
- @Test(expected = IllegalStateException.class)
- public void testJobStoredTwice() throws Exception {
- // Simulate an inconsistent storage that contains two cron jobs under the same key.
-
- expect(cronScheduler.startAsync()).andReturn(cronScheduler);
- cronScheduler.awaitRunning();
- shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
-
- IJobConfiguration jobA =
- IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
- IJobConfiguration jobB =
- IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("* * * * *"));
- expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
- .andReturn(ImmutableList.of(jobA, jobB));
- expectJobValidated(jobA);
- expect(cronScheduler.schedule(eq(jobA.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andReturn("keyA");
- expectJobValidated(jobB);
-
- control.replay();
-
- cron.schedulerActive(new SchedulerActive());
- }
-
- @Test(expected = ScheduleException.class)
- public void testInvalidCronSchedule() throws Exception {
- expect(cronScheduler.isValidSchedule(job.getCronSchedule())).andReturn(false);
-
- control.replay();
-
- cron.receiveJob(sanitizedConfiguration);
- }
-
- @Test
- public void testCancelNewCollision() throws Exception {
- IJobConfiguration killExisting = IJobConfiguration.build(
- job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.CANCEL_NEW));
- Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
- expectActiveTaskFetch(TASK);
-
- control.replay();
-
- cron.receiveJob(new SanitizedConfiguration(killExisting));
- jobTriggerCapture.getValue().run();
- }
-
- @Test(expected = ScheduleException.class)
- public void testScheduleFails() throws Exception {
- expectJobValidated(job);
- storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, sanitizedConfiguration.getJobConfig());
- expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andThrow(new CronException("injected"));
-
- control.replay();
-
- cron.receiveJob(sanitizedConfiguration);
- }
-
- @Test(expected = ScheduleException.class)
- public void testRunOverlapRejected() throws Exception {
- IJobConfiguration killExisting = IJobConfiguration.build(
- job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP));
-
- control.replay();
-
- cron.receiveJob(new SanitizedConfiguration(killExisting));
- }
-
- @Test
- public void testRunOverlapLoadedSuccessfully() throws Exception {
- // Existing RUN_OVERLAP jobs should still load and map.
-
- expect(cronScheduler.startAsync()).andReturn(cronScheduler);
- cronScheduler.awaitRunning();
- shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
-
- IJobConfiguration jobA =
- IJobConfiguration.build(makeJob().newBuilder()
- .setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP));
-
- expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY)).andReturn(ImmutableList.of(jobA));
- expect(cronScheduler.isValidSchedule(jobA.getCronSchedule())).andReturn(true);
- expect(cronScheduler.schedule(eq(jobA.getCronSchedule()), EasyMock.<Runnable>anyObject()))
- .andReturn("keyA");
-
- control.replay();
-
- cron.schedulerActive(new SchedulerActive());
- }
-
- private IJobConfiguration makeJob() {
- return IJobConfiguration.build(new JobConfiguration()
- .setOwner(new Identity(OWNER, OWNER))
- .setKey(JobKeys.from(OWNER, ENVIRONMENT, JOB_NAME).newBuilder())
- .setCronSchedule("1 1 1 1 1")
- .setTaskConfig(defaultTask())
- .setInstanceCount(1));
- }
-
- private static TaskConfig defaultTask() {
- return new TaskConfig()
- .setContactEmail("testing@twitter.com")
- .setExecutorConfig(new ExecutorConfig("aurora", "data"))
- .setNumCpus(1)
- .setRamMb(1024)
- .setDiskMb(1024)
- .setJobName(JOB_NAME)
- .setOwner(new Identity(OWNER, OWNER))
- .setEnvironment(DEFAULT_ENVIRONMENT);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index c8ad55d..cdb1f5d 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -133,6 +133,6 @@ public class LockManagerImplTest extends EasyMockTest {
private void expectLockException(IJobKey key) {
expectedException.expect(LockException.class);
- expectedException.expectMessage(JobKeys.toPath(key));
+ expectedException.expectMessage(JobKeys.canonicalString(key));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 405da0a..2142f11 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -76,10 +76,13 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.ScheduleException;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
import org.apache.aurora.scheduler.quota.QuotaInfo;
import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.CronJobManager;
import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.state.LockManager.LockException;
import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -137,7 +140,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
private static final IJobKey JOB_KEY = JobKeys.from(ROLE, DEFAULT_ENVIRONMENT, JOB_NAME);
private static final ILockKey LOCK_KEY = ILockKey.build(LockKey.job(JOB_KEY.newBuilder()));
private static final ILock LOCK = ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()));
- private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("test");
+ private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("* * * * *");
private static final Lock DEFAULT_LOCK = null;
private static final IResourceAggregate QUOTA =
@@ -678,7 +681,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
public void testReplaceCronTemplate() throws Exception {
expectAuth(ROLE, true);
lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
- cronJobManager.updateJob(anyObject(SanitizedConfiguration.class));
+ cronJobManager.updateJob(anyObject(SanitizedCronJob.class));
control.replay();
assertOkResponse(thrift.replaceCronTemplate(CRON_JOB, DEFAULT_LOCK, SESSION));
@@ -707,9 +710,9 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
public void testReplaceCronTemplateDoesNotExist() throws Exception {
expectAuth(ROLE, true);
lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
- cronJobManager.updateJob(
- SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(CRON_JOB)));
- expectLastCall().andThrow(new ScheduleException("Nope"));
+ cronJobManager.updateJob(anyObject(SanitizedCronJob.class));
+ expectLastCall().andThrow(new CronException("Nope"));
+
control.replay();
assertResponse(INVALID_REQUEST, thrift.replaceCronTemplate(CRON_JOB, DEFAULT_LOCK, SESSION));
@@ -1004,7 +1007,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
Set<JobSummary> ownedImmedieteJobSummaryOnly = ImmutableSet.of(
new JobSummary().setJob(ownedImmediateJob).setStats(new JobStats().setActiveTaskCount(1)));
- expect(cronPredictor.predictNextRun(CRON_SCHEDULE))
+ expect(cronPredictor.predictNextRun(CrontabEntry.parse(CRON_SCHEDULE)))
.andReturn(new Date(nextCronRunMs))
.anyTimes();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 488a545..5f32f21 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -34,8 +34,8 @@ import org.apache.aurora.gen.AuroraAdmin;
import org.apache.aurora.gen.ResourceAggregate;
import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.gen.SessionKey;
+import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CronScheduler;
import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -149,7 +149,7 @@ public class ThriftIT extends EasyMockTest {
@Override
protected void configure() {
- bindMock(CronScheduler.class);
+ bindMock(CronJobManager.class);
bindMock(MaintenanceController.class);
bindMock(Recovery.class);
bindMock(SchedulerCore.class);