You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/05/15 04:34:42 UTC

[2/5] CronScheduler based on Quartz

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
new file mode 100644
index 0000000..e7d1c14
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -0,0 +1,174 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffHelper;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.JobExecutionException;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AuroraCronJobTest extends EasyMockTest {
+  public static final String TASK_ID = "A";
+  private Storage storage;
+  private StateManager stateManager;
+  private CronJobManager cronJobManager;
+  private BackoffHelper backoffHelper;
+
+  private AuroraCronJob auroraCronJob;
+
+  private static final String MANAGER_ID = "MANAGER_ID";
+
+  @Before
+  public void setUp() {
+    storage = MemStorage.newEmptyStorage();
+    stateManager = createMock(StateManager.class);
+    cronJobManager = createMock(CronJobManager.class);
+    backoffHelper = createMock(BackoffHelper.class);
+
+    auroraCronJob = new AuroraCronJob(
+        new AuroraCronJob.Config(backoffHelper), storage, stateManager, cronJobManager);
+    expect(cronJobManager.getManagerKey()).andStubReturn(MANAGER_ID);
+  }
+
+  @Test
+  public void testExecuteNonexistentIsNoop() throws JobExecutionException {
+    control.replay();
+
+    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+  }
+
+  @Test
+  public void testInvalidConfigIsNoop() throws JobExecutionException {
+    control.replay();
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getJobStore().saveAcceptedJob(
+            MANAGER_ID,
+            IJobConfiguration.build(QuartzTestUtil.JOB.newBuilder().setCronSchedule(null)));
+      }
+    });
+
+    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+  }
+
+  @Test
+  public void testEmptyStorage() throws JobExecutionException {
+    stateManager.insertPendingTasks(EasyMock.<Map<Integer, ITaskConfig>>anyObject());
+    expectLastCall().times(3);
+
+    control.replay();
+    populateStorage(CronCollisionPolicy.CANCEL_NEW);
+    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    storage = MemStorage.newEmptyStorage();
+
+    populateStorage(CronCollisionPolicy.KILL_EXISTING);
+    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    storage = MemStorage.newEmptyStorage();
+
+    populateStorage(CronCollisionPolicy.RUN_OVERLAP);
+    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+  }
+
+  @Test
+  public void testCancelNew() throws JobExecutionException {
+    control.replay();
+
+    populateTaskStore();
+    populateStorage(CronCollisionPolicy.CANCEL_NEW);
+    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+  }
+
+  @Test
+  public void testKillExisting() throws Exception {
+    Capture<Supplier<Boolean>> capture = createCapture();
+
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.<ScheduleStatus>absent(),
+        ScheduleStatus.KILLING,
+        AuroraCronJob.KILL_AUDIT_MESSAGE))
+        .andReturn(true);
+    backoffHelper.doUntilSuccess(EasyMock.capture(capture));
+    stateManager.insertPendingTasks(EasyMock.<Map<Integer, ITaskConfig>>anyObject());
+
+    control.replay();
+
+    populateStorage(CronCollisionPolicy.KILL_EXISTING);
+    populateTaskStore();
+    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    assertFalse(capture.getValue().get());
+    storage.write(
+        new Storage.MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(Storage.MutableStoreProvider storeProvider) {
+            storeProvider.getUnsafeTaskStore().deleteAllTasks();
+          }
+        });
+    assertTrue(capture.getValue().get());
+  }
+
+  private void populateTaskStore() {
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
+            IScheduledTask.build(new ScheduledTask()
+                .setStatus(ScheduleStatus.RUNNING)
+                .setAssignedTask(new AssignedTask()
+                    .setTaskId(TASK_ID)
+                    .setTask(QuartzTestUtil.JOB.getTaskConfig().newBuilder())))
+        ));
+      }
+    });
+  }
+
+  private void populateStorage(final CronCollisionPolicy policy) {
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getJobStore().saveAcceptedJob(
+            MANAGER_ID,
+            QuartzTestUtil.makeSanitizedCronJob(policy).getSanitizedConfig().getJobConfig());
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
new file mode 100644
index 0000000..21e8278
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -0,0 +1,257 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.util.concurrent.Service;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import com.google.inject.util.Modules;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.state.PubsubTestUtil;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.easymock.Capture;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.JobExecutionContext;
+import org.quartz.Scheduler;
+import org.quartz.Trigger;
+import org.quartz.TriggerListener;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CronIT extends EasyMockTest {
+  public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("* * * * *");
+
+  private static final IJobKey JOB_KEY = JobKeys.from("roll", "b", "c");
+  private static final Identity IDENTITY = new Identity()
+      .setRole(JOB_KEY.getRole())
+      .setUser("user");
+
+  private static final IJobConfiguration CRON_JOB = IJobConfiguration.build(
+      new JobConfiguration()
+          .setCronSchedule(CRONTAB_ENTRY.toString())
+          .setKey(JOB_KEY.newBuilder())
+          .setInstanceCount(2)
+          .setOwner(IDENTITY)
+          .setTaskConfig(new TaskConfig()
+              .setJobName(JOB_KEY.getName())
+              .setEnvironment(JOB_KEY.getEnvironment())
+              .setOwner(IDENTITY)
+              .setExecutorConfig(new ExecutorConfig()
+                  .setName("cmd.exe")
+                  .setData("echo hello world"))
+              .setNumCpus(7)
+              .setRamMb(8)
+              .setDiskMb(9))
+  );
+
+  private ShutdownRegistry shutdownRegistry;
+  private EventSink eventSink;
+  private Injector injector;
+  private StateManager stateManager;
+  private Storage storage;
+  private AuroraCronJob auroraCronJob;
+
+  private Capture<ExceptionalCommand<?>> shutdown;
+
+  @Before
+  public void setUp() throws Exception {
+    shutdownRegistry = createMock(ShutdownRegistry.class);
+    stateManager = createMock(StateManager.class);
+    storage = MemStorage.newEmptyStorage();
+    auroraCronJob = createMock(AuroraCronJob.class);
+
+    injector = Guice.createInjector(
+        // Override to verify that Guice is actually used for construction of the AuroraCronJob.
+        // TODO(ksweeney): Use the production class here.
+        Modules.override(new CronModule()).with(new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(AuroraCronJob.class).toInstance(auroraCronJob);
+          }
+        }), new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+            bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+            bind(StateManager.class).toInstance(stateManager);
+            bind(Storage.class).toInstance(storage);
+
+            PubsubTestUtil.installPubsub(binder());
+          }
+        });
+    eventSink = PubsubTestUtil.startPubsub(injector);
+
+    shutdown = createCapture();
+    shutdownRegistry.addAction(capture(shutdown));
+  }
+
+  private void boot() {
+    eventSink.post(new PubsubEvent.SchedulerActive());
+  }
+
+  @Test
+  public void testCronSchedulerLifecycle() throws Exception {
+    control.replay();
+
+    Scheduler scheduler = injector.getInstance(Scheduler.class);
+    assertTrue(!scheduler.isStarted());
+
+    boot();
+    Service cronLifecycle = injector.getInstance(CronLifecycle.class);
+
+    assertTrue(cronLifecycle.isRunning());
+    assertTrue(scheduler.isStarted());
+
+    shutdown.getValue().execute();
+
+    assertTrue(!cronLifecycle.isRunning());
+    assertTrue(scheduler.isShutdown());
+  }
+
+  @Test
+  public void testJobsAreScheduled() throws Exception {
+    auroraCronJob.execute(isA(JobExecutionContext.class));
+
+    control.replay();
+    final CronJobManager cronJobManager = injector.getInstance(CronJobManager.class);
+    final Scheduler scheduler = injector.getInstance(Scheduler.class);
+
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getJobStore().saveAcceptedJob(
+            cronJobManager.getManagerKey(),
+            CRON_JOB);
+      }
+    });
+
+    final CountDownLatch cronRan = new CountDownLatch(1);
+    scheduler.getListenerManager().addTriggerListener(new CountDownWhenComplete(cronRan));
+    boot();
+
+    cronRan.await();
+
+    shutdown.getValue().execute();
+  }
+
+  @Test
+  public void testKillExistingDogpiles() throws Exception {
+    // Test that a trigger for a job that hasn't finished running is ignored.
+    final CronJobManager cronJobManager = injector.getInstance(CronJobManager.class);
+
+    final CountDownLatch firstExecutionTriggered = new CountDownLatch(1);
+    final CountDownLatch firstExecutionCompleted = new CountDownLatch(1);
+    final CountDownLatch secondExecutionTriggered = new CountDownLatch(1);
+    final CountDownLatch secondExecutionCompleted = new CountDownLatch(1);
+
+    auroraCronJob.execute(isA(JobExecutionContext.class));
+    expectLastCall().andAnswer(new IAnswer<Void>() {
+      @Override
+      public Void answer() throws Throwable {
+        firstExecutionTriggered.countDown();
+        firstExecutionCompleted.await();
+        return null;
+      }
+    });
+    auroraCronJob.execute(isA(JobExecutionContext.class));
+    expectLastCall().andAnswer(new IAnswer<Object>() {
+      @Override
+      public Void answer() throws Throwable {
+        secondExecutionTriggered.countDown();
+        secondExecutionCompleted.await();
+        return null;
+      }
+    });
+
+    control.replay();
+
+    boot();
+
+    cronJobManager.createJob(SanitizedCronJob.fromUnsanitized(CRON_JOB));
+    cronJobManager.startJobNow(JOB_KEY);
+    firstExecutionTriggered.await();
+    cronJobManager.startJobNow(JOB_KEY);
+    assertEquals(1, secondExecutionTriggered.getCount());
+    firstExecutionCompleted.countDown();
+    secondExecutionTriggered.await();
+    secondExecutionTriggered.countDown();
+  }
+
+  private static class CountDownWhenComplete implements TriggerListener {
+    private final CountDownLatch countDownLatch;
+
+    CountDownWhenComplete(CountDownLatch countDownLatch) {
+      this.countDownLatch = countDownLatch;
+    }
+
+    @Override
+    public String getName() {
+      return CountDownWhenComplete.class.getName();
+    }
+
+    @Override
+    public void triggerFired(Trigger trigger, JobExecutionContext context) {
+    }
+
+    @Override
+    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
+      return false;
+    }
+
+    @Override
+    public void triggerMisfired(Trigger trigger) {
+      // No-op.
+    }
+
+    @Override
+    public void triggerComplete(
+        Trigger trigger,
+        JobExecutionContext context,
+        Trigger.CompletedExecutionInstruction triggerInstructionCode) {
+
+      countDownLatch.countDown();
+      // No-op.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
new file mode 100644
index 0000000..b9116a4
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
@@ -0,0 +1,221 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TimeZone;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.Trigger;
+import org.quartz.impl.matchers.GroupMatcher;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CronJobManagerImplTest extends EasyMockTest {
+  private Storage storage;
+  private Scheduler scheduler;
+
+  private CronJobManager cronJobManager;
+
+  @Before
+  public void setUp() {
+    storage = MemStorage.newEmptyStorage();
+    scheduler = createMock(Scheduler.class);
+
+    cronJobManager = new CronJobManagerImpl(storage, scheduler, TimeZone.getTimeZone("GMT"));
+  }
+
+  @Test
+  public void testStartJobNowExistent() throws Exception {
+    populateStorage();
+    scheduler.triggerJob(QuartzTestUtil.QUARTZ_JOB_KEY);
+
+    control.replay();
+
+    cronJobManager.startJobNow(QuartzTestUtil.AURORA_JOB_KEY);
+  }
+
+  @Test(expected = CronException.class)
+  public void testStartJobNowNonexistent() throws Exception {
+    control.replay();
+
+    cronJobManager.startJobNow(QuartzTestUtil.AURORA_JOB_KEY);
+  }
+
+  @Test
+  public void testUpdateExistingJob() throws Exception {
+    SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
+
+    expect(scheduler.deleteJob(QuartzTestUtil.QUARTZ_JOB_KEY)).andReturn(true);
+    expect(scheduler.scheduleJob(anyObject(JobDetail.class), anyObject(Trigger.class)))
+       .andReturn(null);
+
+    populateStorage();
+
+    control.replay();
+
+    cronJobManager.updateJob(sanitizedCronJob);
+    assertEquals(sanitizedCronJob.getSanitizedConfig().getJobConfig(), fetchFromStorage().orNull());
+  }
+
+  @Test
+  public void testUpdateNonexistentJob() throws Exception {
+    control.replay();
+
+    try {
+      cronJobManager.updateJob(QuartzTestUtil.makeUpdatedJob());
+      fail();
+    } catch (CronException e) {
+      // Expected.
+    }
+
+    assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
+  }
+
+  @Test
+  public void testCreateNonexistentJob() throws Exception {
+    SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
+
+    expect(scheduler.scheduleJob(anyObject(JobDetail.class), anyObject(Trigger.class)))
+        .andReturn(null);
+
+    control.replay();
+
+    cronJobManager.createJob(sanitizedCronJob);
+
+    assertEquals(
+        sanitizedCronJob.getSanitizedConfig().getJobConfig(),
+        fetchFromStorage().orNull());
+  }
+
+  @Test(expected = CronException.class)
+  public void testCreateExistingJobFails() throws Exception {
+    SanitizedCronJob sanitizedCronJob = QuartzTestUtil.makeSanitizedCronJob();
+    populateStorage();
+    control.replay();
+
+    cronJobManager.createJob(sanitizedCronJob);
+  }
+
+  @Test
+  public void testGetJobs() throws Exception {
+    control.replay();
+    assertEquals(Collections.emptyList(), ImmutableList.copyOf(cronJobManager.getJobs()));
+
+    populateStorage();
+    assertEquals(
+        QuartzTestUtil.makeSanitizedCronJob().getSanitizedConfig().getJobConfig(),
+        Iterables.getOnlyElement(cronJobManager.getJobs()));
+  }
+
+  @Test
+  public void testNoRunOverlap() throws Exception {
+    SanitizedCronJob runOverlapJob = SanitizedCronJob.fromUnsanitized(
+        IJobConfiguration.build(QuartzTestUtil.JOB.newBuilder()
+            .setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP)));
+
+    control.replay();
+
+    try {
+      cronJobManager.createJob(runOverlapJob);
+      fail();
+    } catch (CronException e) {
+      // Expected.
+    }
+
+    try {
+      cronJobManager.updateJob(runOverlapJob);
+    } catch (CronException e) {
+      // Expected.
+    }
+
+    assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
+  }
+
+  @Test
+  public void testDeleteJob() throws Exception {
+    expect(scheduler.deleteJob(QuartzTestUtil.QUARTZ_JOB_KEY)).andReturn(true);
+
+    control.replay();
+
+    assertFalse(cronJobManager.deleteJob(QuartzTestUtil.AURORA_JOB_KEY));
+    populateStorage();
+    assertTrue(cronJobManager.deleteJob(QuartzTestUtil.AURORA_JOB_KEY));
+    assertEquals(Optional.<IJobConfiguration>absent(), fetchFromStorage());
+  }
+
+  @Test
+  public void testGetScheduledJobs() throws Exception {
+    JobDetail jobDetail = createMock(JobDetail.class);
+    expect(scheduler.getJobKeys(EasyMock.<GroupMatcher<JobKey>>anyObject()))
+        .andReturn(ImmutableSet.of(QuartzTestUtil.QUARTZ_JOB_KEY));
+    expect(scheduler.getJobDetail(QuartzTestUtil.QUARTZ_JOB_KEY))
+        .andReturn(jobDetail);
+    expect(jobDetail.getDescription()).andReturn("* * * * *");
+
+    control.replay();
+
+    Map<IJobKey, CrontabEntry> scheduledJobs = cronJobManager.getScheduledJobs();
+    assertEquals(CrontabEntry.parse("* * * * *"), scheduledJobs.get(QuartzTestUtil.AURORA_JOB_KEY));
+  }
+
+  private void populateStorage() throws Exception {
+    storage.write(new Storage.MutateWork.NoResult<Exception>() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) throws Exception {
+        storeProvider.getJobStore().saveAcceptedJob(
+            cronJobManager.getManagerKey(),
+            QuartzTestUtil.makeSanitizedCronJob().getSanitizedConfig().getJobConfig());
+      }
+    });
+  }
+
+  private Optional<IJobConfiguration> fetchFromStorage() {
+    return storage.consistentRead(new Storage.Work.Quiet<Optional<IJobConfiguration>>() {
+      @Override
+      public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
+        return storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(),
+            QuartzTestUtil.AURORA_JOB_KEY);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
new file mode 100644
index 0000000..6bc97f3
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImplTest.java
@@ -0,0 +1,89 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+
+import org.apache.aurora.scheduler.cron.ExpectedPrediction;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CronPredictorImplTest {
+  private static final TimeZone TIME_ZONE = TimeZone.getTimeZone("GMT");
+  public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("* * * * *");
+
+  private CronPredictor cronPredictor;
+
+  private FakeClock clock;
+
+  @Before
+  public void setUp() {
+    clock = new FakeClock();
+    cronPredictor = new CronPredictorImpl(clock, TIME_ZONE);
+  }
+
+  @Test
+  public void testValidSchedule() {
+    clock.advance(Amount.of(1L, Time.DAYS));
+    Date expectedPrediction = new Date(Amount.of(1L, Time.DAYS).as(Time.MILLISECONDS)
+            + Amount.of(1L, Time.MINUTES).as(Time.MILLISECONDS));
+    assertEquals(expectedPrediction, cronPredictor.predictNextRun(CrontabEntry.parse("* * * * *")));
+  }
+
+  @Test
+  public void testCronExpressions() {
+    assertEquals("0 * * ? * 1,2,3,4,5,6,7",
+        Quartz.cronExpression(CRONTAB_ENTRY, TIME_ZONE).getCronExpression());
+  }
+
+  @Test
+  public void testCronPredictorConforms() throws Exception {
+    for (ExpectedPrediction expectedPrediction : ExpectedPrediction.getAll()) {
+      List<Date> results = Lists.newArrayList();
+      clock.setNowMillis(0);
+      for (int i = 0; i < expectedPrediction.getTriggerTimes().size(); i++) {
+        Date nextTriggerTime = cronPredictor.predictNextRun(expectedPrediction.parseCrontabEntry());
+        results.add(nextTriggerTime);
+        clock.setNowMillis(nextTriggerTime.getTime());
+      }
+      assertEquals(
+          "Cron schedule " + expectedPrediction.getSchedule() + " made unexpected predictions.",
+          Lists.transform(
+              expectedPrediction.getTriggerTimes(),
+              new Function<Long, Date>() {
+                @Override
+                public Date apply(Long time) {
+                  return new Date(time);
+                }
+              }
+          ),
+          results);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
new file mode 100644
index 0000000..b10f514
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import com.google.common.base.Throwables;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.quartz.JobKey;
+
+/**
+ * Fixtures used across quartz tests.
+ */
+final class QuartzTestUtil {
+  static final IJobKey AURORA_JOB_KEY = JobKeys.from("role", "env", "job");
+  static final IJobConfiguration JOB = IJobConfiguration.build(
+      new JobConfiguration()
+          .setCronSchedule("* * * * SUN")
+          .setInstanceCount(10)
+          .setOwner(new Identity("role", "user"))
+          .setKey(AURORA_JOB_KEY.newBuilder())
+          .setTaskConfig(new TaskConfig()
+              .setOwner(new Identity("role", "user"))
+              .setJobName(AURORA_JOB_KEY.getName())
+              .setEnvironment(AURORA_JOB_KEY.getEnvironment())
+              .setDiskMb(3)
+              .setRamMb(4)
+              .setNumCpus(5)
+              .setExecutorConfig(new ExecutorConfig()
+                  .setName("cmd.exe")
+                  .setData("echo hello world")))
+  );
+  static final JobKey QUARTZ_JOB_KEY = Quartz.jobKey(AURORA_JOB_KEY);
+
+  private QuartzTestUtil() {
+    // Utility class.
+  }
+
+  static SanitizedCronJob makeSanitizedCronJob(CronCollisionPolicy collisionPolicy) {
+    try {
+      return SanitizedCronJob.fromUnsanitized(
+          IJobConfiguration.build(JOB.newBuilder().setCronCollisionPolicy(collisionPolicy)));
+    } catch (CronException | ConfigurationManager.TaskDescriptionException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  static SanitizedCronJob makeSanitizedCronJob() {
+    return makeSanitizedCronJob(CronCollisionPolicy.KILL_EXISTING);
+  }
+
+  static SanitizedCronJob makeUpdatedJob() throws Exception {
+    return SanitizedCronJob.fromUnsanitized(
+        IJobConfiguration.build(JOB.newBuilder().setCronSchedule("* * 1 * *")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index da6c0ff..0e17f49 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -38,15 +38,12 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Constraint;
-import org.apache.aurora.gen.CronCollisionPolicy;
 import org.apache.aurora.gen.ExecutorConfig;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobConfiguration;
@@ -68,7 +65,10 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
@@ -85,6 +85,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
@@ -106,9 +107,9 @@ import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFIC
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.reportMatcher;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -128,25 +129,23 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private static final IJobKey KEY_A = JobKeys.from(ROLE_A, ENV_A, JOB_A);
   private static final int ONE_GB = 1024;
 
-  private static final String ROLE_B = "Test_Role_B";
-  private static final IJobKey KEY_B = JobKeys.from(ROLE_B, ENV_A, JOB_A);
-
   private static final SlaveID SLAVE_ID = SlaveID.newBuilder().setValue("SlaveId").build();
   private static final String SLAVE_HOST_1 = "SlaveHost1";
 
   private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
   private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
 
+  public static final CrontabEntry CRONTAB_ENTRY = CrontabEntry.parse("1 1 1 1 *");
+  public static final String RAW_CRONTAB_ENTRY = CRONTAB_ENTRY.toString();
+
   private Driver driver;
   private StateManagerImpl stateManager;
   private Storage storage;
   private SchedulerCoreImpl scheduler;
-  private CronScheduler cronScheduler;
-  private CronJobManager cron;
+  private CronJobManager cronJobManager;
   private FakeClock clock;
   private EventSink eventSink;
   private RescheduleCalculator rescheduleCalculator;
-  private ShutdownRegistry shutdownRegistry;
   private QuotaManager quotaManager;
 
   // TODO(William Farner): Set up explicit expectations for calls to generate task IDs.
@@ -164,18 +163,15 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     clock = new FakeClock();
     eventSink = createMock(EventSink.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
-    cronScheduler = createMock(CronScheduler.class);
-    shutdownRegistry = createMock(ShutdownRegistry.class);
+    cronJobManager = createMock(CronJobManager.class);
     quotaManager = createMock(QuotaManager.class);
 
     eventSink.post(EasyMock.<PubsubEvent>anyObject());
     expectLastCall().anyTimes();
-    expect(cronScheduler.schedule(anyObject(String.class), anyObject(Runnable.class)))
-        .andStubReturn("key");
-    expect(cronScheduler.isValidSchedule(anyObject(String.class))).andStubReturn(true);
 
     expect(quotaManager.checkQuota(anyObject(ITaskConfig.class), anyInt()))
         .andStubReturn(ENOUGH_QUOTA);
+    expect(cronJobManager.hasJob(anyObject(IJobKey.class))).andStubReturn(false);
   }
 
   /**
@@ -208,15 +204,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
         taskIdGenerator,
         eventSink,
         rescheduleCalculator);
-    cron = new CronJobManager(
-        stateManager,
-        storage,
-        cronScheduler,
-        shutdownRegistry,
-        MoreExecutors.sameThreadExecutor());
     scheduler = new SchedulerCoreImpl(
         storage,
-        cron,
+        cronJobManager,
         stateManager,
         taskIdGenerator,
         quotaManager);
@@ -250,6 +240,19 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
         FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet());
   }
 
+  @Test
+  public void testCreateJobEmptyString() throws Exception {
+    // TODO(ksweeney): Deprecate this as part of AURORA-423.
+
+    control.replay();
+    buildScheduler();
+
+    SanitizedConfiguration job = SanitizedConfiguration.fromUnsanitized(
+        IJobConfiguration.build(makeJob(KEY_A, 1).getJobConfig().newBuilder().setCronSchedule("")));
+    scheduler.createJob(job);
+    assertTaskCount(1);
+  }
+
   private static Constraint dedicatedConstraint(Set<String> values) {
     return new Constraint(DEDICATED_ATTRIBUTE,
         TaskConstraint.value(new ValueConstraint(false, values)));
@@ -272,7 +275,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     buildScheduler();
 
     TaskConfig newTask = nonProductionTask();
-    newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(JobKeys.toPath(KEY_A))));
+    newTask.addToConstraints(dedicatedConstraint(ImmutableSet.of(JobKeys.canonicalString(KEY_A))));
     scheduler.createJob(makeJob(KEY_A, newTask));
     assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
   }
@@ -445,147 +448,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.createJob(makeJob(KEY_A, 1));
   }
 
-  @Test(expected = ScheduleException.class)
-  public void testCreateDuplicateCronJob() throws Exception {
-    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
-
-    control.replay();
-    buildScheduler();
-
-    // Cron jobs are scheduled on a delay, so this job's tasks will not be scheduled immediately,
-    // but duplicate jobs should still be rejected.
-    scheduler.createJob(sanitizedConfiguration);
-    assertTaskCount(0);
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-  }
-
-  @Test
-  public void testStartCronJob() throws Exception {
-    // Create a cron job, ask the scheduler to start it, and ensure that the tasks exist
-    // in the PENDING state.
-
-    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
-    IJobKey jobKey = sanitizedConfiguration.getJobConfig().getKey();
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(sanitizedConfiguration);
-    assertTaskCount(0);
-
-    cron.startJobNow(jobKey);
-    assertEquals(PENDING, getOnlyTask(Query.jobScoped(jobKey)).getStatus());
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testStartNonexistentCronJob() throws Exception {
-    // Try to start a cron job that doesn't exist.
-    control.replay();
-    buildScheduler();
-
-    cron.startJobNow(KEY_A);
-  }
-
-  @Test
-  public void testStartNonCronJob() throws Exception {
-    // Create a NON cron job and try to start it as though it were a cron job, and ensure that
-    // no cron tasks are created.
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, 1));
-    String taskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
-
-    try {
-      cron.startJobNow(KEY_A);
-      fail("Start should have failed.");
-    } catch (ScheduleException e) {
-      // Expected.
-    }
-
-    assertEquals(PENDING, getTask(taskId).getStatus());
-    assertFalse(cron.hasJob(KEY_A));
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testStartNonOwnedCronJob() throws Exception {
-    // Try to start a cron job that is not owned by us.
-    // Should throw an exception.
-
-    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
-    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
-    expect(cronScheduler.isValidSchedule(job.getCronSchedule())).andReturn(true);
-    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andReturn("key");
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(sanitizedConfiguration);
-    assertTaskCount(0);
-
-    cron.startJobNow(KEY_B);
-  }
-
-  @Test
-  public void testStartRunningCronJob() throws Exception {
-    // Start a cron job that is already started by an earlier
-    // call and is PENDING. Make sure it follows the cron collision policy.
-    SanitizedConfiguration sanitizedConfiguration =
-        makeCronJob(KEY_A, 1, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
-    expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
-        EasyMock.<Runnable>anyObject()))
-        .andReturn("key");
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(sanitizedConfiguration);
-    assertTaskCount(0);
-    assertTrue(cron.hasJob(KEY_A));
-
-    cron.startJobNow(KEY_A);
-    assertTaskCount(1);
-
-    String taskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
-
-    // Now start the same cron job immediately.
-    cron.startJobNow(KEY_A);
-    assertTaskCount(1);
-    assertEquals(PENDING, getOnlyTask(Query.jobScoped(KEY_A)).getStatus());
-
-    // Make sure the pending job is the new one.
-    String newTaskId = Tasks.id(getOnlyTask(Query.jobScoped(KEY_A)));
-    assertFalse(taskId.equals(newTaskId));
-  }
-
-  @Test
-  public void testKillCreateCronJob() throws Exception {
-    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 1, "1 1 1 1 1");
-    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
-    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andReturn("key");
-    cronScheduler.deschedule("key");
-
-    SanitizedConfiguration updated = makeCronJob(KEY_A, 1, "1 2 3 4 5");
-    IJobConfiguration updatedJob = updated.getJobConfig();
-    expect(cronScheduler.schedule(eq(updatedJob.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andReturn("key2");
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(sanitizedConfiguration);
-    assertTrue(cron.hasJob(KEY_A));
-
-    scheduler.killTasks(Query.jobScoped(KEY_A), OWNER_A.getUser());
-    scheduler.createJob(updated);
-
-    IJobConfiguration stored = Iterables.getOnlyElement(cron.getJobs());
-    assertEquals(updatedJob.getCronSchedule(), stored.getCronSchedule());
-  }
-
   @Test
   public void testKillTask() throws Exception {
     driver.killTask(EasyMock.<String>anyObject());
@@ -638,17 +500,21 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testServiceTasksRescheduled() throws Exception {
     int numServiceTasks = 5;
+    IJobKey adhocKey = KEY_A;
+    IJobKey serviceKey = IJobKey.build(
+        adhocKey.newBuilder().setName(adhocKey.getName() + "service"));
 
     expectTaskNotThrottled().times(numServiceTasks);
+    expectNoCronJob(adhocKey);
+    expectNoCronJob(serviceKey);
 
     control.replay();
     buildScheduler();
 
     // Schedule 5 service and 5 non-service tasks.
-    scheduler.createJob(makeJob(KEY_A, numServiceTasks));
+    scheduler.createJob(makeJob(adhocKey, numServiceTasks));
     TaskConfig task = productionTask().setIsService(true);
-    scheduler.createJob(
-        makeJob(IJobKey.build(KEY_A.newBuilder().setName(KEY_A.getName() + "service")), task, 5));
+    scheduler.createJob(makeJob(serviceKey, task, 5));
 
     assertEquals(10, getTasksByStatus(PENDING).size());
     changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
@@ -675,6 +541,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     int totalFailures = 10;
 
     expectTaskNotThrottled().times(totalFailures);
+    expectNoCronJob(KEY_A);
 
     control.replay();
     buildScheduler();
@@ -733,6 +600,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test
   public void testNoTransitionFromTerminalState() throws Exception {
     expectKillTask(1);
+    expectNoCronJob(KEY_A);
 
     control.replay();
     buildScheduler();
@@ -753,6 +621,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void testFailedTaskIncrementsFailureCount() throws Exception {
     int maxFailures = 5;
     expectTaskNotThrottled().times(maxFailures - 1);
+    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
 
     control.replay();
     buildScheduler();
@@ -785,78 +654,9 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testCronJobLifeCycle() throws Exception {
-    SanitizedConfiguration sanitizedConfiguration = makeCronJob(KEY_A, 10, "1 1 1 1 1");
-    IJobConfiguration job = sanitizedConfiguration.getJobConfig();
-    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andReturn("key");
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(sanitizedConfiguration);
-    assertTaskCount(0);
-    assertTrue(cron.hasJob(KEY_A));
-
-    // Simulate a triggering of the cron job.
-    cron.startJobNow(KEY_A);
-    assertTaskCount(10);
-    assertEquals(10,
-        getTasks(Query.jobScoped(KEY_A).byStatus(PENDING)).size());
-
-    assertTaskCount(10);
-
-    changeStatus(Query.roleScoped(ROLE_A), ASSIGNED, STARTING);
-    assertTaskCount(10);
-    changeStatus(Query.roleScoped(ROLE_A), RUNNING);
-    assertTaskCount(10);
-    changeStatus(Query.roleScoped(ROLE_A), FINISHED);
-  }
-
-  @Test
-  public void testCronNoSuicide() throws Exception {
-    SanitizedConfiguration sanitizedConfiguration =
-        makeCronJob(KEY_A, 10, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
-    expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
-        EasyMock.<Runnable>anyObject()))
-        .andReturn("key");
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(sanitizedConfiguration);
-    assertTaskCount(0);
-
-    try {
-      scheduler.createJob(sanitizedConfiguration);
-      fail();
-    } catch (ScheduleException e) {
-      // Expected.
-    }
-    assertTrue(cron.hasJob(KEY_A));
-
-    // Simulate a triggering of the cron job.
-    cron.startJobNow(KEY_A);
-    assertTaskCount(10);
-
-    Set<String> taskIds = Tasks.ids(getTasksOwnedBy(OWNER_A));
-
-    // Simulate a triggering of the cron job.
-    cron.startJobNow(KEY_A);
-    assertTaskCount(10);
-    assertTrue(Sets.intersection(taskIds, Tasks.ids(getTasksOwnedBy(OWNER_A))).isEmpty());
-
-    try {
-      scheduler.createJob(sanitizedConfiguration);
-      fail();
-    } catch (ScheduleException e) {
-      // Expected.
-    }
-    assertTrue(cron.hasJob(KEY_A));
-  }
-
-  @Test
   public void testKillPendingTask() throws Exception {
+    expectNoCronJob(KEY_A);
+
     control.replay();
     buildScheduler();
 
@@ -891,16 +691,12 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testKillCronTask() throws Exception {
-    SanitizedConfiguration sanitizedConfiguration =
-        makeCronJob(KEY_A, 1, "1 1 1 1 1", CronCollisionPolicy.KILL_EXISTING);
-    expect(cronScheduler.schedule(eq(sanitizedConfiguration.getJobConfig().getCronSchedule()),
-        EasyMock.<Runnable>anyObject()))
-        .andReturn("key");
-    cronScheduler.deschedule("key");
-
+    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
+    cronJobManager.createJob(anyObject(SanitizedCronJob.class));
+    expect(cronJobManager.deleteJob(KEY_A)).andReturn(true);
     control.replay();
     buildScheduler();
-    scheduler.createJob(makeCronJob(KEY_A, 1, "1 1 1 1 1"));
+    scheduler.createJob(makeCronJob(KEY_A, 1, RAW_CRONTAB_ENTRY));
 
     // This will fail if the cron task could not be found.
     scheduler.killTasks(Query.jobScoped(KEY_A), OWNER_A.getUser());
@@ -910,6 +706,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void testLostTaskRescheduled() throws Exception {
     expectKillTask(2);
     expectTaskNotThrottled().times(2);
+    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
 
     control.replay();
     buildScheduler();
@@ -939,32 +736,10 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testKillNotStrictlyJobScoped() throws Exception {
-    // Makes sure that queries that are not strictly job scoped will not remove the job entirely.
-    SanitizedConfiguration config = makeCronJob(KEY_A, 10, "1 1 1 1 1");
-    IJobConfiguration job = config.getJobConfig();
-    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andReturn("key");
-    cronScheduler.deschedule("key");
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(config);
-    assertTrue(cron.hasJob(KEY_A));
-    cron.startJobNow(KEY_A);
-    assertTaskCount(10);
-
-    scheduler.killTasks(Query.instanceScoped(KEY_A, 0), USER_A);
-    assertTaskCount(9);
-    assertTrue(cron.hasJob(KEY_A));
-
-    scheduler.killTasks(Query.jobScoped(KEY_A), USER_A);
-    assertFalse(cron.hasJob(KEY_A));
-  }
-
-  @Test
   public void testKillJob() throws Exception {
+    expectNoCronJob(KEY_A);
+    expect(cronJobManager.deleteJob(KEY_A)).andReturn(false);
+
     control.replay();
     buildScheduler();
 
@@ -1015,6 +790,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   @Test(expected = ScheduleException.class)
   public void testRestartNonexistentShard() throws Exception {
     expectTaskNotThrottled();
+    expectNoCronJob(KEY_A);
 
     control.replay();
     buildScheduler();
@@ -1026,6 +802,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
   @Test
   public void testRestartPendingShard() throws Exception {
+    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
+
     control.replay();
     buildScheduler();
 
@@ -1058,6 +836,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void testPortResourceResetAfterReschedule() throws Exception {
     expectKillTask(1);
     expectTaskNotThrottled();
+    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
 
     control.replay();
     buildScheduler();
@@ -1120,6 +899,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
       }
     };
 
+    expectNoCronJob(KEY_A);
     control.replay();
     buildScheduler();
 
@@ -1174,9 +954,8 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
 
     scheduler.addInstances(
         job.getKey(),
-        ImmutableSet.copyOf(
-            ContiguousSet.create(Range.closed(0, SchedulerCoreImpl.MAX_TASKS_PER_JOB.get()),
-                DiscreteDomain.integers())),
+        ContiguousSet.create(Range.closed(0, SchedulerCoreImpl.MAX_TASKS_PER_JOB.get()),
+            DiscreteDomain.integers()),
         job.getTaskConfig());
   }
 
@@ -1225,6 +1004,7 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
         .setOwner(OWNER_A);
 
     ImmutableSet<Integer> instances = ImmutableSet.of(0);
+    expectNoCronJob(KEY_A);
 
     control.replay();
     buildScheduler();
@@ -1235,6 +1015,10 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
     scheduler.addInstances(KEY_A, instances, ITaskConfig.build(newTask));
   }
 
+  private void expectNoCronJob(IJobKey jobKey) throws CronException {
+    expect(cronJobManager.hasJob(jobKey)).andReturn(false);
+  }
+
   private static String getLocalHost() {
     try {
       return InetAddress.getLocalHost().getHostName();
@@ -1265,19 +1049,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   private static SanitizedConfiguration makeCronJob(
       IJobKey jobKey,
       int numDefaultTasks,
-      String cronSchedule,
-      CronCollisionPolicy policy) throws TaskDescriptionException {
-
-    return new SanitizedConfiguration(IJobConfiguration.build(
-        makeCronJob(jobKey, numDefaultTasks, cronSchedule)
-            .getJobConfig()
-            .newBuilder()
-            .setCronCollisionPolicy(policy)));
-  }
-
-  private static SanitizedConfiguration makeCronJob(
-      IJobKey jobKey,
-      int numDefaultTasks,
       String cronSchedule) throws TaskDescriptionException {
 
     SanitizedConfiguration job = makeJob(jobKey, numDefaultTasks);
@@ -1401,4 +1172,23 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   public void changeStatus(String taskId, ScheduleStatus status, Optional<String> message) {
     changeStatus(Query.taskScoped(taskId), status, message);
   }
+
+  private SanitizedConfiguration hasJobKey(final IJobKey key) {
+    reportMatcher(new IArgumentMatcher() {
+      @Override
+      public boolean matches(Object item) {
+        if (!(item instanceof SanitizedConfiguration)) {
+          return false;
+        }
+        SanitizedConfiguration configuration = (SanitizedConfiguration) item;
+        return key.equals(configuration.getJobConfig().getKey());
+      }
+
+      @Override
+      public void appendTo(StringBuffer buffer) {
+        buffer.append(key.toString());
+      }
+    });
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java b/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
deleted file mode 100644
index fa9cb75..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/CronJobManagerTest.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.concurrent.Executor;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.CronCollisionPolicy;
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
-import static org.apache.aurora.scheduler.state.CronJobManager.MANAGER_KEY;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class CronJobManagerTest extends EasyMockTest {
-
-  private static final String OWNER = "owner";
-  private static final String ENVIRONMENT = "staging11";
-  private static final String JOB_NAME = "jobName";
-  private static final String DEFAULT_JOB_KEY = "key";
-  private static final String TASK_ID = "id";
-  private static final IScheduledTask TASK = IScheduledTask.build(
-      new ScheduledTask().setAssignedTask(new AssignedTask().setTaskId(TASK_ID)));
-
-  private StateManager stateManager;
-  private Executor delayExecutor;
-  private Capture<Runnable> delayLaunchCapture;
-  private StorageTestUtil storageUtil;
-
-  private CronScheduler cronScheduler;
-  private ShutdownRegistry shutdownRegistry;
-  private CronJobManager cron;
-  private IJobConfiguration job;
-  private SanitizedConfiguration sanitizedConfiguration;
-
-  @Before
-  public void setUp() throws Exception {
-    stateManager = createMock(StateManager.class);
-    delayExecutor = createMock(Executor.class);
-    delayLaunchCapture = createCapture();
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    cronScheduler = createMock(CronScheduler.class);
-    shutdownRegistry = createMock(ShutdownRegistry.class);
-
-    cron = new CronJobManager(
-        stateManager,
-        storageUtil.storage,
-        cronScheduler,
-        shutdownRegistry,
-        delayExecutor);
-    job = makeJob();
-    sanitizedConfiguration = SanitizedConfiguration.fromUnsanitized(job);
-  }
-
-  private void expectJobValidated(IJobConfiguration savedJob) {
-    expect(cronScheduler.isValidSchedule(savedJob.getCronSchedule())).andReturn(true);
-  }
-
-  private void expectJobValidated() {
-    expectJobValidated(sanitizedConfiguration.getJobConfig());
-  }
-
-  private Capture<Runnable> expectJobAccepted(IJobConfiguration savedJob) throws Exception {
-    expectJobValidated(savedJob);
-    storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, savedJob);
-    Capture<Runnable> jobTriggerCapture = createCapture();
-    expect(cronScheduler.schedule(eq(savedJob.getCronSchedule()), capture(jobTriggerCapture)))
-        .andReturn(DEFAULT_JOB_KEY);
-    return jobTriggerCapture;
-  }
-
-  private void expectJobAccepted() throws Exception {
-    expectJobAccepted(sanitizedConfiguration.getJobConfig());
-  }
-
-  private void expectJobFetch() {
-    expectJobValidated();
-    expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
-        .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
-  }
-
-  private IExpectationSetters<?> expectActiveTaskFetch(IScheduledTask... activeTasks) {
-    return storageUtil.expectTaskFetch(Query.jobScoped(job.getKey()).active(), activeTasks);
-  }
-
-  @Test
-  public void testPubsubWiring() throws Exception {
-    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
-    cronScheduler.awaitRunning();
-    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
-    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
-        .andReturn(ImmutableList.<IJobConfiguration>of());
-
-    control.replay();
-
-    Injector injector = Guice.createInjector(new AbstractModule() {
-      @Override
-      protected void configure() {
-        bind(StateManager.class).toInstance(stateManager);
-        bind(Storage.class).toInstance(storageUtil.storage);
-        bind(CronScheduler.class).toInstance(cronScheduler);
-        bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
-        PubsubTestUtil.installPubsub(binder());
-        StateModule.bindCronJobManager(binder());
-      }
-    });
-    cron = injector.getInstance(CronJobManager.class);
-    EventSink eventSink = PubsubTestUtil.startPubsub(injector);
-    eventSink.post(new SchedulerActive());
-  }
-
-  @Test
-  public void testStart() throws Exception {
-    expectJobAccepted();
-    expectJobFetch();
-    expectActiveTaskFetch();
-
-    // Job is executed immediately since there are no existing tasks to kill.
-    stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
-    expect(cronScheduler.getSchedule(DEFAULT_JOB_KEY))
-        .andReturn(Optional.of(job.getCronSchedule()))
-        .times(2);
-
-    control.replay();
-
-    assertEquals(ImmutableMap.<IJobKey, String>of(), cron.getScheduledJobs());
-    cron.receiveJob(sanitizedConfiguration);
-    assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
-    cron.startJobNow(job.getKey());
-    assertEquals(ImmutableMap.of(job.getKey(), job.getCronSchedule()), cron.getScheduledJobs());
-  }
-
-  @Test
-  public void testDeleteInconsistent() throws Exception {
-    // Tests a case where a job exists in the storage, but is not registered with the cron system.
-
-    expect(storageUtil.jobStore.fetchJob(MANAGER_KEY, job.getKey()))
-        .andReturn(Optional.of(sanitizedConfiguration.getJobConfig()));
-
-    control.replay();
-
-    assertTrue(cron.deleteJob(job.getKey()));
-  }
-
-  private IExpectationSetters<?> expectTaskKilled(String id) {
-    expect(stateManager.changeState(
-        id,
-        Optional.<ScheduleStatus>absent(),
-        ScheduleStatus.KILLING,
-        CronJobManager.KILL_AUDIT_MESSAGE))
-        .andReturn(true);
-    return expectLastCall();
-  }
-
-  @Test
-  public void testDelayedStart() throws Exception {
-    expectJobAccepted();
-    expectJobFetch();
-
-    // Query to test if live tasks exist for the job.
-    expectActiveTaskFetch(TASK);
-
-    // Live tasks exist, so the cron manager must delay the cron launch.
-    delayExecutor.execute(capture(delayLaunchCapture));
-
-    // The cron manager will then try to initiate the kill.
-    expectTaskKilled(TASK_ID);
-
-    // Immediate query and delayed query.
-    expectActiveTaskFetch(TASK).times(2);
-
-    // Simulate the live task disappearing.
-    expectActiveTaskFetch();
-
-    stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
-
-    control.replay();
-
-    cron.receiveJob(sanitizedConfiguration);
-    cron.startJobNow(job.getKey());
-    assertEquals(ImmutableSet.of(job.getKey()), cron.getPendingRuns());
-    delayLaunchCapture.getValue().run();
-    assertEquals(ImmutableSet.<IJobKey>of(), cron.getPendingRuns());
-  }
-
-  @Test
-  public void testDelayedStartResets() throws Exception {
-    expectJobAccepted();
-    expectJobFetch();
-
-    // Query to test if live tasks exist for the job.
-    expectActiveTaskFetch(TASK);
-
-    // Live tasks exist, so the cron manager must delay the cron launch.
-    delayExecutor.execute(capture(delayLaunchCapture));
-
-    // The cron manager will then try to initiate the kill.
-    expectTaskKilled(TASK_ID);
-
-    // Immediate query and delayed query.
-    expectActiveTaskFetch(TASK).times(2);
-
-    // Simulate the live task disappearing.
-    expectActiveTaskFetch();
-
-    // Round two.
-    expectJobFetch();
-    expectActiveTaskFetch(TASK);
-    delayExecutor.execute(capture(delayLaunchCapture));
-    expectTaskKilled(TASK_ID);
-    expectActiveTaskFetch(TASK).times(2);
-    expectActiveTaskFetch();
-
-    stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
-    expectLastCall().times(2);
-
-    control.replay();
-
-    cron.receiveJob(sanitizedConfiguration);
-    cron.startJobNow(job.getKey());
-    delayLaunchCapture.getValue().run();
-
-    // Start the job again.  Since the previous delayed start completed, this should repeat the
-    // entire process.
-    cron.startJobNow(job.getKey());
-    delayLaunchCapture.getValue().run();
-  }
-
-  @Test
-  public void testDelayedStartMultiple() throws Exception {
-    expectJobAccepted();
-    expectJobFetch();
-
-    // Query to test if live tasks exist for the job.
-    expectActiveTaskFetch(TASK).times(3);
-
-    // Live tasks exist, so the cron manager must delay the cron launch.
-    delayExecutor.execute(capture(delayLaunchCapture));
-
-    // The cron manager will then try to initiate the kill.
-    expectJobFetch();
-    expectJobFetch();
-    expectTaskKilled(TASK_ID).times(3);
-
-    // Immediate queries and delayed query.
-    expectActiveTaskFetch(TASK).times(4);
-
-    // Simulate the live task disappearing.
-    expectActiveTaskFetch();
-
-    stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
-
-    control.replay();
-
-    cron.receiveJob(sanitizedConfiguration);
-
-    // Attempt to trick the cron manager into launching multiple times, or launching multiple
-    // pollers.
-    cron.startJobNow(job.getKey());
-    cron.startJobNow(job.getKey());
-    cron.startJobNow(job.getKey());
-    delayLaunchCapture.getValue().run();
-  }
-
-  @Test
-  public void testUpdate() throws Exception {
-    SanitizedConfiguration updated = new SanitizedConfiguration(
-        IJobConfiguration.build(job.newBuilder().setCronSchedule("1 2 3 4 5")));
-
-    expectJobAccepted();
-    cronScheduler.deschedule(DEFAULT_JOB_KEY);
-    expectJobAccepted(updated.getJobConfig());
-
-    control.replay();
-
-    cron.receiveJob(sanitizedConfiguration);
-    cron.updateJob(updated);
-  }
-
-  @Test
-  public void testConsistentState() throws Exception {
-    IJobConfiguration updated =
-        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
-
-    expectJobAccepted();
-    cronScheduler.deschedule(DEFAULT_JOB_KEY);
-    expectJobAccepted(updated);
-
-    control.replay();
-
-    cron.receiveJob(sanitizedConfiguration);
-
-    IJobConfiguration failedUpdate =
-        IJobConfiguration.build(updated.newBuilder().setCronSchedule(null));
-    try {
-      cron.updateJob(new SanitizedConfiguration(failedUpdate));
-      fail();
-    } catch (ScheduleException e) {
-      // Expected.
-    }
-
-    cron.updateJob(new SanitizedConfiguration(updated));
-  }
-
-  @Test
-  public void testInvalidStoredJob() throws Exception {
-    // Invalid jobs are left alone, but doesn't halt operation.
-
-    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
-    cronScheduler.awaitRunning();
-    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
-
-    IJobConfiguration jobA =
-        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5 6 7"));
-    IJobConfiguration jobB =
-        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule(null));
-
-    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY)).andReturn(ImmutableList.of(jobA, jobB));
-    expect(cronScheduler.isValidSchedule(jobA.getCronSchedule())).andReturn(false);
-
-    control.replay();
-
-    cron.schedulerActive(new SchedulerActive());
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testJobStoredTwice() throws Exception {
-    // Simulate an inconsistent storage that contains two cron jobs under the same key.
-
-    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
-    cronScheduler.awaitRunning();
-    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
-
-    IJobConfiguration jobA =
-        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("1 2 3 4 5"));
-    IJobConfiguration jobB =
-        IJobConfiguration.build(makeJob().newBuilder().setCronSchedule("* * * * *"));
-    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY))
-        .andReturn(ImmutableList.of(jobA, jobB));
-    expectJobValidated(jobA);
-    expect(cronScheduler.schedule(eq(jobA.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andReturn("keyA");
-    expectJobValidated(jobB);
-
-    control.replay();
-
-    cron.schedulerActive(new SchedulerActive());
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testInvalidCronSchedule() throws Exception {
-    expect(cronScheduler.isValidSchedule(job.getCronSchedule())).andReturn(false);
-
-    control.replay();
-
-    cron.receiveJob(sanitizedConfiguration);
-  }
-
-  @Test
-  public void testCancelNewCollision() throws Exception {
-    IJobConfiguration killExisting = IJobConfiguration.build(
-        job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.CANCEL_NEW));
-    Capture<Runnable> jobTriggerCapture = expectJobAccepted(killExisting);
-    expectActiveTaskFetch(TASK);
-
-    control.replay();
-
-    cron.receiveJob(new SanitizedConfiguration(killExisting));
-    jobTriggerCapture.getValue().run();
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testScheduleFails() throws Exception {
-    expectJobValidated(job);
-    storageUtil.jobStore.saveAcceptedJob(MANAGER_KEY, sanitizedConfiguration.getJobConfig());
-    expect(cronScheduler.schedule(eq(job.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andThrow(new CronException("injected"));
-
-    control.replay();
-
-    cron.receiveJob(sanitizedConfiguration);
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testRunOverlapRejected() throws Exception {
-    IJobConfiguration killExisting = IJobConfiguration.build(
-        job.newBuilder().setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP));
-
-    control.replay();
-
-    cron.receiveJob(new SanitizedConfiguration(killExisting));
-  }
-
-  @Test
-  public void testRunOverlapLoadedSuccessfully() throws Exception {
-    // Existing RUN_OVERLAP jobs should still load and map.
-
-    expect(cronScheduler.startAsync()).andReturn(cronScheduler);
-    cronScheduler.awaitRunning();
-    shutdownRegistry.addAction(EasyMock.<ExceptionalCommand<?>>anyObject());
-
-    IJobConfiguration jobA =
-        IJobConfiguration.build(makeJob().newBuilder()
-            .setCronCollisionPolicy(CronCollisionPolicy.RUN_OVERLAP));
-
-    expect(storageUtil.jobStore.fetchJobs(MANAGER_KEY)).andReturn(ImmutableList.of(jobA));
-    expect(cronScheduler.isValidSchedule(jobA.getCronSchedule())).andReturn(true);
-    expect(cronScheduler.schedule(eq(jobA.getCronSchedule()), EasyMock.<Runnable>anyObject()))
-        .andReturn("keyA");
-
-    control.replay();
-
-    cron.schedulerActive(new SchedulerActive());
-  }
-
-  private IJobConfiguration makeJob() {
-    return IJobConfiguration.build(new JobConfiguration()
-        .setOwner(new Identity(OWNER, OWNER))
-        .setKey(JobKeys.from(OWNER, ENVIRONMENT, JOB_NAME).newBuilder())
-        .setCronSchedule("1 1 1 1 1")
-        .setTaskConfig(defaultTask())
-        .setInstanceCount(1));
-  }
-
-  private static TaskConfig defaultTask() {
-    return new TaskConfig()
-        .setContactEmail("testing@twitter.com")
-        .setExecutorConfig(new ExecutorConfig("aurora", "data"))
-        .setNumCpus(1)
-        .setRamMb(1024)
-        .setDiskMb(1024)
-        .setJobName(JOB_NAME)
-        .setOwner(new Identity(OWNER, OWNER))
-        .setEnvironment(DEFAULT_ENVIRONMENT);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index c8ad55d..cdb1f5d 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -133,6 +133,6 @@ public class LockManagerImplTest extends EasyMockTest {
 
   private void expectLockException(IJobKey key) {
     expectedException.expect(LockException.class);
-    expectedException.expectMessage(JobKeys.toPath(key));
+    expectedException.expectMessage(JobKeys.canonicalString(key));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 405da0a..2142f11 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -76,10 +76,13 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -137,7 +140,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final IJobKey JOB_KEY = JobKeys.from(ROLE, DEFAULT_ENVIRONMENT, JOB_NAME);
   private static final ILockKey LOCK_KEY = ILockKey.build(LockKey.job(JOB_KEY.newBuilder()));
   private static final ILock LOCK = ILock.build(new Lock().setKey(LOCK_KEY.newBuilder()));
-  private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("test");
+  private static final JobConfiguration CRON_JOB = makeJob().setCronSchedule("* * * * *");
   private static final Lock DEFAULT_LOCK = null;
 
   private static final IResourceAggregate QUOTA =
@@ -678,7 +681,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testReplaceCronTemplate() throws Exception {
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
-    cronJobManager.updateJob(anyObject(SanitizedConfiguration.class));
+    cronJobManager.updateJob(anyObject(SanitizedCronJob.class));
     control.replay();
 
     assertOkResponse(thrift.replaceCronTemplate(CRON_JOB, DEFAULT_LOCK, SESSION));
@@ -707,9 +710,9 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testReplaceCronTemplateDoesNotExist() throws Exception {
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
-    cronJobManager.updateJob(
-        SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(CRON_JOB)));
-    expectLastCall().andThrow(new ScheduleException("Nope"));
+    cronJobManager.updateJob(anyObject(SanitizedCronJob.class));
+    expectLastCall().andThrow(new CronException("Nope"));
+
     control.replay();
 
     assertResponse(INVALID_REQUEST, thrift.replaceCronTemplate(CRON_JOB, DEFAULT_LOCK, SESSION));
@@ -1004,7 +1007,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     Set<JobSummary> ownedImmedieteJobSummaryOnly = ImmutableSet.of(
         new JobSummary().setJob(ownedImmediateJob).setStats(new JobStats().setActiveTaskCount(1)));
 
-    expect(cronPredictor.predictNextRun(CRON_SCHEDULE))
+    expect(cronPredictor.predictNextRun(CrontabEntry.parse(CRON_SCHEDULE)))
         .andReturn(new Date(nextCronRunMs))
         .anyTimes();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 488a545..5f32f21 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -34,8 +34,8 @@ import org.apache.aurora.gen.AuroraAdmin;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.gen.SessionKey;
+import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CronScheduler;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -149,7 +149,7 @@ public class ThriftIT extends EasyMockTest {
 
           @Override
           protected void configure() {
-            bindMock(CronScheduler.class);
+            bindMock(CronJobManager.class);
             bindMock(MaintenanceController.class);
             bindMock(Recovery.class);
             bindMock(SchedulerCore.class);