You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/04/23 20:45:37 UTC

[5/5] git commit: AURORA-132: Cron system based on Quartz

AURORA-132: Cron system based on Quartz

This introduces a new cron system based on Quartz and removes the
NoopCronModule.

Testing Done:
./gradlew build

Bugs closed: AURORA-132

Reviewed at https://reviews.apache.org/r/19767/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/c285f2f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/c285f2f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/c285f2f8

Branch: refs/heads/master
Commit: c285f2f83dc769143bc4a3be67e3d53b8fc51418
Parents: 760e5d3
Author: Kevin Sweeney <ke...@apache.org>
Authored: Wed Apr 23 11:45:15 2014 -0700
Committer: Kevin Sweeney <ke...@apache.org>
Committed: Wed Apr 23 11:45:15 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |    1 +
 .../aurora/scheduler/MesosTaskFactory.java      |    2 +-
 .../aurora/scheduler/app/SchedulerMain.java     |   20 +-
 .../aurora/scheduler/async/TaskGroups.java      |    2 +-
 .../apache/aurora/scheduler/base/JobKeys.java   |   40 +-
 .../configuration/ConfigurationManager.java     |   12 +-
 .../aurora/scheduler/cron/CronJobManager.java   |   97 +
 .../aurora/scheduler/cron/CronPredictor.java    |    2 +-
 .../aurora/scheduler/cron/CronScheduler.java    |   40 +-
 .../aurora/scheduler/cron/CrontabEntryTest.java |  163 -
 .../aurora/scheduler/cron/SanitizedCronJob.java |  131 +
 .../scheduler/cron/noop/NoopCronModule.java     |   40 -
 .../scheduler/cron/noop/NoopCronPredictor.java  |   33 -
 .../scheduler/cron/noop/NoopCronScheduler.java  |   83 -
 .../scheduler/cron/quartz/AuroraCronJob.java    |  231 ++
 .../cron/quartz/AuroraCronJobFactory.java       |   49 +
 .../cron/quartz/CronJobManagerImpl.java         |  256 ++
 .../scheduler/cron/quartz/CronLifecycle.java    |  114 +
 .../scheduler/cron/quartz/CronModule.java       |  130 +
 .../cron/quartz/CronPredictorImpl.java          |   46 +
 .../cron/quartz/CronSchedulerImpl.java          |   71 +
 .../aurora/scheduler/cron/quartz/Quartz.java    |  124 +
 .../scheduler/cron/testing/AbstractCronIT.java  |  135 -
 .../org/apache/aurora/scheduler/http/Cron.java  |    3 +-
 .../aurora/scheduler/http/ServletModule.java    |    2 +-
 .../aurora/scheduler/http/StructDump.java       |   15 +-
 .../aurora/scheduler/state/CronJobManager.java  |  484 ---
 .../aurora/scheduler/state/LockManagerImpl.java |    2 +-
 .../scheduler/state/SchedulerCoreImpl.java      |   30 +-
 .../aurora/scheduler/state/StateModule.java     |    7 -
 .../thrift/SchedulerThriftInterface.java        |   40 +-
 .../cron/testing/cron-schedule-predictions.json | 3332 ------------------
 .../aurora/scheduler/cron/CrontabEntryTest.java |  168 +
 .../scheduler/cron/ExpectedPrediction.java      |   55 +
 .../aurora/scheduler/cron/noop/NoopCronIT.java  |   63 -
 .../cron/quartz/AuroraCronJobTest.java          |  174 +
 .../aurora/scheduler/cron/quartz/CronIT.java    |  257 ++
 .../cron/quartz/CronJobManagerImplTest.java     |  221 ++
 .../cron/quartz/CronPredictorImplTest.java      |   89 +
 .../scheduler/cron/quartz/QuartzTestUtil.java   |   78 +
 .../state/BaseSchedulerCoreImplTest.java        |  344 +-
 .../scheduler/state/CronJobManagerTest.java     |  490 ---
 .../scheduler/state/LockManagerImplTest.java    |    2 +-
 .../thrift/SchedulerThriftInterfaceTest.java    |   17 +-
 .../aurora/scheduler/thrift/ThriftIT.java       |    4 +-
 .../scheduler/cron/expected-predictions.json    | 3332 ++++++++++++++++++
 46 files changed, 5808 insertions(+), 5223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 459cd85..831d8b3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -163,6 +163,7 @@ dependencies {
   compile 'org.apache.mesos:mesos:0.17.0'
   compile thriftLib
   compile 'org.apache.zookeeper:zookeeper:3.3.4'
+  compile 'org.quartz-scheduler:quartz:2.2.1'
   compile "org.slf4j:slf4j-api:${slf4jRev}"
   compile "org.slf4j:slf4j-jdk14:${slf4jRev}"
   compile 'com.twitter.common.logging:log4j:0.0.7'

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
index 86bbc29..bdd8c19 100644
--- a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
@@ -135,7 +135,7 @@ public interface MesosTaskFactory {
       }
       TaskInfo.Builder taskBuilder =
           TaskInfo.newBuilder()
-              .setName(JobKeys.toPath(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
+              .setName(JobKeys.canonicalString(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
               .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
               .setSlaveId(slaveId)
               .addAllResources(resources)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index bf3d7a3..da6f5e5 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -59,9 +59,7 @@ import org.apache.aurora.scheduler.DriverFactory;
 import org.apache.aurora.scheduler.DriverFactory.DriverFactoryImpl;
 import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
 import org.apache.aurora.scheduler.SchedulerLifecycle;
-import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.apache.aurora.scheduler.cron.noop.NoopCronModule;
+import org.apache.aurora.scheduler.cron.quartz.CronModule;
 import org.apache.aurora.scheduler.local.IsolatedSchedulerModule;
 import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
@@ -115,17 +113,7 @@ public class SchedulerMain extends AbstractApplication {
       .add(CapabilityValidator.class)
       .build();
 
-  @CmdLine(name = "cron_module",
-      help = "A Guice module to provide cron bindings. NOTE: The default is a no-op.")
-  private static final Arg<? extends Class<? extends Module>> CRON_MODULE =
-      Arg.create(NoopCronModule.class);
-
-  private static final Iterable<Class<?>> CRON_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
-      .add(CronPredictor.class)
-      .add(CronScheduler.class)
-      .build();
-
-  // TODO(Suman Karumuri): Pass in AUTH and CRON modules as extra modules
+  // TODO(Suman Karumuri): Pass in AUTH as extra module
   @CmdLine(name = "extra_modules",
       help = "A list of modules that provide additional functionality.")
   private static final Arg<List<Class<? extends Module>>> EXTRA_MODULES =
@@ -151,8 +139,7 @@ public class SchedulerMain extends AbstractApplication {
 
   private static Iterable<? extends Module> getExtraModules() {
     Builder<Module> modules = ImmutableList.builder();
-    modules.add(Modules.wrapInPrivateModule(AUTH_MODULE.get(), AUTH_MODULE_CLASSES))
-        .add(Modules.wrapInPrivateModule(CRON_MODULE.get(), CRON_MODULE_CLASSES));
+    modules.add(Modules.wrapInPrivateModule(AUTH_MODULE.get(), AUTH_MODULE_CLASSES));
 
     for (Class<? extends Module> moduleClass : EXTRA_MODULES.get()) {
       modules.add(Modules.getModule(moduleClass));
@@ -173,6 +160,7 @@ public class SchedulerMain extends AbstractApplication {
         .addAll(getExtraModules())
         .add(new LogStorageModule())
         .add(new MemStorageModule(Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class)))
+        .add(new CronModule())
         .add(new ThriftModule())
         .add(new ThriftAuthModule())
         .build();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index 6aff091..ef73032 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -243,7 +243,7 @@ public class TaskGroups implements EventSubscriber {
 
     @Override
     public String toString() {
-      return JobKeys.toPath(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
+      return JobKeys.canonicalString(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
index db1bec4..c81ac62 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
@@ -15,17 +15,20 @@
  */
 package org.apache.aurora.scheduler.base;
 
+import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
-import com.google.common.base.Strings;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.TaskQuery;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -83,9 +86,9 @@ public final class JobKeys {
    */
   public static boolean isValid(@Nullable IJobKey jobKey) {
     return jobKey != null
-        && !Strings.isNullOrEmpty(jobKey.getRole())
-        && !Strings.isNullOrEmpty(jobKey.getEnvironment())
-        && !Strings.isNullOrEmpty(jobKey.getName());
+        && ConfigurationManager.isGoodIdentifier(jobKey.getRole())
+        && ConfigurationManager.isGoodIdentifier(jobKey.getEnvironment())
+        && ConfigurationManager.isGoodIdentifier(jobKey.getName());
   }
 
   /**
@@ -132,25 +135,32 @@ public final class JobKeys {
   }
 
   /**
-   * Create a "/"-delimited String representation of a job key, suitable for logging but not
-   * necessarily suitable for use as a unique identifier.
+   * Create a "/"-delimited representation of job key usable as a unique identifier in this cluster.
    *
+   * It is guaranteed that {@code k.equals(JobKeys.parse(JobKeys.canonicalString(k))}.
+   *
+   * @see #parse(String)
    * @param jobKey Key to represent.
-   * @return "/"-delimited representation of the key.
+   * @return Canonical "/"-delimited representation of the key.
    */
-  public static String toPath(IJobKey jobKey) {
-    return jobKey.getRole() + "/" + jobKey.getEnvironment() + "/" + jobKey.getName();
+  public static String canonicalString(IJobKey jobKey) {
+    return Joiner.on("/").join(jobKey.getRole(), jobKey.getEnvironment(), jobKey.getName());
   }
 
   /**
-   * Create a "/"-delimited String representation of job key, suitable for logging but not
-   * necessarily suitable for use as a unique identifier.
+   * Create a job key from a "role/environment/name" representation.
+   *
+   * It is guaranteed that {@code k.equals(JobKeys.parse(JobKeys.canonicalString(k))}.
    *
-   * @param job Job to represent.
-   * @return "/"-delimited representation of the job's key.
+   * @see #canonicalString(IJobKey)
+   * @param string Input to parse.
+   * @return Parsed representation.
+   * @throws IllegalArgumentException when the string fails to parse.
    */
-  public static String toPath(IJobConfiguration job) {
-    return toPath(job.getKey());
+  public static IJobKey parse(String string) throws IllegalArgumentException {
+    List<String> components = Splitter.on("/").splitToList(string);
+    checkArgument(components.size() == 3);
+    return from(components.get(0), components.get(1), components.get(2));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 82034e0..e5ad461 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -164,9 +164,15 @@ public final class ConfigurationManager {
     // Utility class.
   }
 
-  @VisibleForTesting
-  static boolean isGoodIdentifier(String identifier) {
-    return GOOD_IDENTIFIER.matcher(identifier).matches()
+  /**
+   * Verifies that an identifier is an acceptable name component.
+   *
+   * @param identifier Identifier to check.
+   * @return false if the identifier is null or invalid.
+   */
+  public static boolean isGoodIdentifier(@Nullable String identifier) {
+    return identifier != null
+        && GOOD_IDENTIFIER.matcher(identifier).matches()
         && (identifier.length() <= MAX_IDENTIFIER_LENGTH);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/cron/CronJobManager.java
new file mode 100644
index 0000000..7c8d5ec
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/CronJobManager.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron;
+
+import java.util.Map;
+
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+
+/**
+ * Manages the persistence and scheduling of jobs that should be run periodically on a cron
+ * schedule.
+ */
+public interface CronJobManager {
+  /**
+   * Triggers execution of a job.
+   *
+   * @param jobKey Key of the job to start.
+   * @throws CronException If the job could not be started with the cron system.
+   */
+  void startJobNow(IJobKey jobKey) throws CronException;
+
+  /**
+   * Persist a new cron job to storage and schedule it for future execution.
+   *
+   * @param config Cron job configuration to update to.
+   * @throws CronException If a job with the same key does not exist or the job could not be
+   * scheduled.
+   */
+  void updateJob(SanitizedCronJob config) throws CronException;
+
+  /**
+   * Persist a cron job to storage and schedule it for future execution.
+   *
+   * @param config New cron job configuration.
+   * @throws CronException If a job with the same key exists or the job could not be scheduled.
+   */
+  void createJob(SanitizedCronJob config) throws CronException;
+
+  /**
+   * Get all cron jobs.
+   *
+   * TODO(ksweeney): Consider deprecating this and letting caller query storage directly.
+   *
+   * @return An immutable snapshot of cron jobs at some instant.
+   */
+  Iterable<IJobConfiguration> getJobs();
+
+  /**
+   * Test whether a job exists.
+   *
+   * TODO(ksweeney): Consider deprecating this and letting caller query storage directly.
+   *
+   * @param jobKey Key of the job to check.
+   * @return false when a job does not exist in storage.
+   */
+  boolean hasJob(IJobKey jobKey);
+
+  /**
+   * Remove a job and deschedule it.
+   *
+   * @param jobKey Key of the job to delete.
+   * @return true if a job was removed.
+   */
+  boolean deleteJob(IJobKey jobKey);
+
+  /**
+   * A list of the currently scheduled jobs and when they will run according to the underlying
+   * execution engine.
+   *
+   * @return A map from job to the cron schedule in use for that job.
+   */
+  Map<IJobKey, CrontabEntry> getScheduledJobs();
+
+  /**
+   * The unique ID of this cron job manager, used as a prefix in the JobStore.
+   *
+   * TODO(ksweeney): Consider removing this from storage entirely since the JobManager abstraction
+   * is gone.
+   *
+   * @return The unique ID of the manager.
+   */
+  String getManagerKey();
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java b/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java
index df0c378..0ce60f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java
@@ -27,5 +27,5 @@ public interface CronPredictor {
    * @param schedule Cron schedule to predict the next time for.
    * @return A prediction for the next time a cron will run.
    */
-  Date predictNextRun(String schedule);
+  Date predictNextRun(CrontabEntry schedule);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java b/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
index 56e9950..f38dea5 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
@@ -15,49 +15,19 @@
  */
 package org.apache.aurora.scheduler.cron;
 
-import javax.annotation.Nullable;
-
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Service;
+
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 
 /**
  * An execution manager that executes work on a cron schedule.
  */
-public interface CronScheduler extends Service {
-  /**
-   * Schedules a task on a cron schedule.
-   *
-   * @param schedule Cron-style schedule.
-   * @param task Work to run when on the cron schedule.
-   * @return A unique ID to identify the scheduled cron task.
-   * @throws CronException when there was a failure to schedule, for example if {@code schedule}
-   *         is not a valid input.
-   * @throws IllegalStateException If the cron scheduler is not currently running.
-   */
-  String schedule(String schedule, Runnable task) throws CronException, IllegalStateException;
-
-  /**
-   * Removes a scheduled cron item.
-   *
-   * @param key Key previously returned from {@link #schedule(String, Runnable)}.
-   * @throws IllegalStateException If the cron scheduler is not currently running.
-   */
-  void deschedule(String key) throws IllegalStateException;
-
+public interface CronScheduler {
   /**
    * Gets the cron schedule associated with a scheduling key.
    *
-   * @param key Key previously returned from {@link #schedule(String, Runnable)}.
+   * @param key Key previously returned from {@link #schedule(CrontabEntry, Runnable)}.
    * @return The task's cron schedule, if a matching task was found.
-   * @throws IllegalStateException If the cron scheduler is not currently running.
-   */
-  Optional<String> getSchedule(String key) throws IllegalStateException;
-
-  /**
-   * Checks to see if the scheduler would be accepted by the underlying scheduler.
-   *
-   * @param schedule Cron scheduler to validate.
-   * @return {@code true} if the schedule is valid.
    */
-  boolean isValidSchedule(@Nullable String schedule);
+  Optional<CrontabEntry> getSchedule(IJobKey key);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/CrontabEntryTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CrontabEntryTest.java b/src/main/java/org/apache/aurora/scheduler/cron/CrontabEntryTest.java
deleted file mode 100644
index 2bb848a..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/CrontabEntryTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Copyright 2014 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron;
-
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class CrontabEntryTest {
-  @Test
-  public void testHashCodeAndEquals() {
-
-    List<CrontabEntry> entries = ImmutableList.of(
-        CrontabEntry.parse("* * * * *"),
-        CrontabEntry.parse("0-59 * * * *"),
-        CrontabEntry.parse("0-57,58,59 * * * *"),
-        CrontabEntry.parse("* 23,1,2,4,0-22 * * *"),
-        CrontabEntry.parse("1-50,0,51-59 * * * sun-sat"));
-
-    for (CrontabEntry lhs : entries) {
-      for (CrontabEntry rhs : entries) {
-        assertEquals(lhs, rhs);
-      }
-    }
-
-    Set<CrontabEntry> equivalentEntries = Sets.newHashSet(entries);
-    assertTrue(equivalentEntries.size() == 1);
-  }
-
-  @Test
-  public void testEqualsCoverage() {
-    assertNotEquals(CrontabEntry.parse("* * * * *"), new Object());
-
-    assertNotEquals(CrontabEntry.parse("* * * * *"), CrontabEntry.parse("1 * * * *"));
-    assertEquals(CrontabEntry.parse("1,2,3 * * * *"), CrontabEntry.parse("1-3 * * * *"));
-
-    assertNotEquals(CrontabEntry.parse("* 0-22 * * *"), CrontabEntry.parse("* * * * *"));
-    assertEquals(CrontabEntry.parse("* 0-23 * * *"), CrontabEntry.parse("* * * * *"));
-
-    assertNotEquals(CrontabEntry.parse("1 1 1-30 * *"), CrontabEntry.parse("1 1 * * *"));
-    assertEquals(CrontabEntry.parse("1 1 1-31 * *"), CrontabEntry.parse("1 1 * * *"));
-
-    assertNotEquals(CrontabEntry.parse("1 1 * JAN,FEB-NOV *"), CrontabEntry.parse("1 1 * * *"));
-    assertEquals(CrontabEntry.parse("1 1 * JAN,FEB-DEC *"), CrontabEntry.parse("1 1 * * *"));
-
-    assertNotEquals(CrontabEntry.parse("* * * * SUN"), CrontabEntry.parse("* * * * SAT"));
-    assertEquals(CrontabEntry.parse("* * * * 0"), CrontabEntry.parse("* * * * SUN"));
-  }
-
-  @Test
-  public void testSkip() {
-    assertEquals(CrontabEntry.parse("*/15 * * * *"), CrontabEntry.parse("0,15,30,45 * * * *"));
-    assertEquals(
-        CrontabEntry.parse("* */2 * * *"),
-        CrontabEntry.parse("0-59 0,2,4,6,8,10,12-23/2  * * *"));
-  }
-
-  @Test
-  public void testToString() {
-    assertEquals("0-58 * * * *", CrontabEntry.parse("0,1-57,58 * * * *").toString());
-    assertEquals("* * * * *", CrontabEntry.parse("* * * * *").toString());
-  }
-
-  @Test
-  public void testWildcards() {
-    CrontabEntry wildcardMinuteEntry = CrontabEntry.parse("* 1 1 1 *");
-    assertEquals("*", wildcardMinuteEntry.getMinuteAsString());
-    assertTrue(wildcardMinuteEntry.hasWildcardMinute());
-    assertFalse(wildcardMinuteEntry.hasWildcardHour());
-    assertFalse(wildcardMinuteEntry.hasWildcardDayOfMonth());
-    assertFalse(wildcardMinuteEntry.hasWildcardMonth());
-    assertTrue(wildcardMinuteEntry.hasWildcardDayOfWeek());
-
-    CrontabEntry wildcardHourEntry = CrontabEntry.parse("1 * 1 1 *");
-    assertEquals("*", wildcardHourEntry.getHourAsString());
-    assertFalse(wildcardHourEntry.hasWildcardMinute());
-    assertTrue(wildcardHourEntry.hasWildcardHour());
-    assertFalse(wildcardHourEntry.hasWildcardDayOfMonth());
-    assertFalse(wildcardHourEntry.hasWildcardMonth());
-    assertTrue(wildcardHourEntry.hasWildcardDayOfWeek());
-
-    CrontabEntry wildcardDayOfMonth = CrontabEntry.parse("1 1 * 1 *");
-    assertEquals("*", wildcardDayOfMonth.getDayOfMonthAsString());
-    assertFalse(wildcardDayOfMonth.hasWildcardMinute());
-    assertFalse(wildcardDayOfMonth.hasWildcardHour());
-    assertTrue(wildcardDayOfMonth.hasWildcardDayOfMonth());
-    assertFalse(wildcardDayOfMonth.hasWildcardMonth());
-    assertTrue(wildcardDayOfMonth.hasWildcardDayOfWeek());
-
-    CrontabEntry wildcardMonth = CrontabEntry.parse("1 1 1 * *");
-    assertEquals("*", wildcardMonth.getMonthAsString());
-    assertFalse(wildcardMonth.hasWildcardMinute());
-    assertFalse(wildcardMonth.hasWildcardHour());
-    assertFalse(wildcardMonth.hasWildcardDayOfMonth());
-    assertTrue(wildcardMonth.hasWildcardMonth());
-    assertTrue(wildcardMonth.hasWildcardDayOfWeek());
-
-    CrontabEntry wildcardDayOfWeek = CrontabEntry.parse("1 1 1 1 *");
-    assertEquals("*", wildcardDayOfWeek.getDayOfWeekAsString());
-    assertFalse(wildcardDayOfWeek.hasWildcardMinute());
-    assertFalse(wildcardDayOfWeek.hasWildcardHour());
-    assertFalse(wildcardDayOfWeek.hasWildcardDayOfMonth());
-    assertFalse(wildcardDayOfWeek.hasWildcardMonth());
-    assertTrue(wildcardDayOfWeek.hasWildcardDayOfWeek());
-  }
-
-  @Test
-  public void testEqualsIsCanonical() {
-    String rawEntry = "* * */3 * *";
-    CrontabEntry input = CrontabEntry.parse(rawEntry);
-    assertNotEquals(
-        rawEntry + " is not the canonical form of " + input,
-        rawEntry,
-        input.toString());
-    assertEquals(
-        "The form returned by toString is canonical",
-        input.toString(),
-        CrontabEntry.parse(input.toString()).toString());
-  }
-
-  @Test
-  public void testBadEntries() {
-    List<String> badPatterns = ImmutableList.of(
-        "* * * * MON-SUN",
-        "* * **",
-        "0-59 0-59 * * *",
-        "1/1 * * * *",
-        "5 5 * MAR-JAN *",
-        "*/0 * * * *",
-        "0-59/0 * * * *",
-        "0-59/60 * * * *",
-        "* * * *, *",
-        "* * 1 * 1"
-    );
-
-    for (String pattern : badPatterns) {
-      assertNull(CrontabEntry.tryParse(pattern).orNull());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/SanitizedCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/SanitizedCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/SanitizedCronJob.java
new file mode 100644
index 0000000..0082932
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/SanitizedCronJob.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Optional;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.commons.lang.StringUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Used by functions that expect field validation before being called.
+ */
+public final class SanitizedCronJob {
+  private final SanitizedConfiguration config;
+  private final CrontabEntry crontabEntry;
+
+  private SanitizedCronJob(IJobConfiguration unsanitized)
+      throws CronException, ConfigurationManager.TaskDescriptionException {
+
+    this(SanitizedConfiguration.fromUnsanitized(unsanitized));
+  }
+
+  private SanitizedCronJob(SanitizedConfiguration config) throws CronException {
+    final IJobConfiguration job = config.getJobConfig();
+    if (!hasCronSchedule(job)) {
+      throw new CronException(String.format(
+          "Not a valid cron job, %s has no cron schedule", JobKeys.canonicalString(job.getKey())));
+    }
+
+    Optional<CrontabEntry> entry = CrontabEntry.tryParse(job.getCronSchedule());
+    if (!entry.isPresent()) {
+      throw new CronException("Invalid cron schedule: " + job.getCronSchedule());
+    }
+
+    this.config = config;
+    this.crontabEntry = entry.get();
+  }
+
+  /**
+   * Get the default cron collision policy.
+   *
+   * @param policy A (possibly null) policy.
+   * @return The given policy or a default if the policy was null.
+   */
+  public static CronCollisionPolicy orDefault(@Nullable CronCollisionPolicy policy) {
+    return Optional.fromNullable(policy).or(CronCollisionPolicy.KILL_EXISTING);
+  }
+
+  /**
+   * Create a SanitizedCronJob from a SanitizedConfiguration. SanitizedCronJob performs additional
+   * validation to ensure that the provided job contains all properties needed to run it on a
+   * cron schedule.
+   *
+   * @param config Config to validate.
+   * @return Config wrapped in defaults.
+   * @throws CronException If a cron-specific validation error occured.
+   */
+  public static SanitizedCronJob from(SanitizedConfiguration config)
+      throws CronException {
+
+    return new SanitizedCronJob(config);
+  }
+
+  /**
+   * Create a cron job from an unsanitized input job. Suitable for RPC input validation.
+   *
+   * @param unsanitized Unsanitized input job.
+   * @return A sanitized job if all validation succeeds.
+   * @throws CronException If validation fails with a cron-specific error.
+   * @throws ConfigurationManager.TaskDescriptionException If validation fails with a non
+   * cron-specific error.
+   */
+  public static SanitizedCronJob fromUnsanitized(IJobConfiguration unsanitized)
+      throws CronException, ConfigurationManager.TaskDescriptionException {
+
+    return new SanitizedCronJob(unsanitized);
+  }
+
+  /**
+   * Get this job's cron collision policy.
+   *
+   * @return This job's cron collision policy.
+   */
+  public CronCollisionPolicy getCronCollisionPolicy() {
+    return orDefault(config.getJobConfig().getCronCollisionPolicy());
+  }
+
+  private static boolean hasCronSchedule(IJobConfiguration job) {
+    checkNotNull(job);
+    return !StringUtils.isEmpty(job.getCronSchedule());
+  }
+
+  /**
+   * Returns the cron schedule associated with this job.
+   *
+   * @return The cron schedule associated with this job.
+   */
+  public CrontabEntry getCrontabEntry() {
+    return crontabEntry;
+  }
+
+  /**
+   * Returns the sanitized job configuration associated with the cron job.
+   *
+   * @return This cron job's sanitized job configuration.
+   */
+  public SanitizedConfiguration getSanitizedConfig() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronModule.java
deleted file mode 100644
index e0935f5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronModule.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.noop;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-
-import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-
-/**
- * A Module to wire up a cron scheduler that does not actually schedule cron jobs.
- *
- * This class exists as a short term hack to get around a license compatibility issue - Real
- * Implementation (TM) coming soon.
- */
-public class NoopCronModule extends AbstractModule {
-  @Override
-  protected void configure() {
-    bind(CronScheduler.class).to(NoopCronScheduler.class);
-    bind(NoopCronScheduler.class).in(Singleton.class);
-
-    bind(CronPredictor.class).to(NoopCronPredictor.class);
-    bind(NoopCronPredictor.class).in(Singleton.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronPredictor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronPredictor.java b/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronPredictor.java
deleted file mode 100644
index 7b25152..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronPredictor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.noop;
-
-import java.util.Date;
-
-import org.apache.aurora.scheduler.cron.CronPredictor;
-
-/**
- * A cron predictor that always suggests that the next run is Unix epoch time.
- *
- * This class exists as a short term hack to get around a license compatibility issue - Real
- * Implementation (TM) coming soon.
- */
-class NoopCronPredictor implements CronPredictor {
-  @Override
-  public Date predictNextRun(String schedule) {
-    return new Date(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java b/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
deleted file mode 100644
index a31551c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.noop;
-
-import java.util.Collections;
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import org.apache.aurora.scheduler.cron.CronScheduler;
-
-/**
- * A cron scheduler that accepts cron jobs but never runs them. Useful if you want to hook up an
- * external triggering mechanism (e.g. a system cron job that calls the startCronJob RPC manually
- * on an interval).
- *
- * This class exists as a short term hack to get around a license compatibility issue - Real
- * Implementation (TM) coming soon.
- */
-class NoopCronScheduler extends AbstractIdleService implements CronScheduler {
-  private static final Logger LOG = Logger.getLogger(NoopCronScheduler.class.getName());
-
-  // Keep a list of schedules we've seen.
-  private final Set<String> schedules = Collections.synchronizedSet(Sets.<String>newHashSet());
-
-  @Override
-  public void startUp() throws Exception {
-    LOG.warning("NO-OP cron scheduler is in use. Cron jobs submitted will not be triggered!");
-  }
-
-  @Override
-  public void shutDown() {
-    // No-op.
-  }
-
-  @Override
-  public String schedule(String schedule, Runnable task) {
-    schedules.add(schedule);
-
-    LOG.warning(String.format(
-        "NO-OP cron scheduler is in use! %s with schedule %s WILL NOT be automatically triggered!",
-        task,
-        schedule));
-
-    return schedule;
-  }
-
-  @Override
-  public void deschedule(String key) throws IllegalStateException {
-    schedules.remove(key);
-  }
-
-  @Override
-  public Optional<String> getSchedule(String key) throws IllegalStateException {
-    return schedules.contains(key)
-        ? Optional.of(key)
-        : Optional.<String>absent();
-  }
-
-  @Override
-  public boolean isValidSchedule(@Nullable String schedule) {
-    // Accept everything.
-    return schedule != null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
new file mode 100644
index 0000000..fc02264
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -0,0 +1,231 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+
+import com.twitter.common.base.Supplier;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.BackoffHelper;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+
+/**
+ * Encapsulates the logic behind a single trigger of a single job key. Multiple executions may run
+ * concurrently but only a single instance will be active at a time per job key.
+ *
+ * <p>
+ * Executions may block for long periods of time when waiting for a kill to complete. The Quartz
+ * scheduler should therefore be configured with a large number of threads.
+ */
+@DisallowConcurrentExecution
+class AuroraCronJob implements Job {
+  private static final Logger LOG = Logger.getLogger(AuroraCronJob.class.getName());
+
+  private static final AtomicLong CRON_JOB_TRIGGERS = Stats.exportLong("cron_job_triggers");
+  private static final AtomicLong CRON_JOB_MISFIRES = Stats.exportLong("cron_job_misfires");
+  private static final AtomicLong CRON_JOB_PARSE_FAILURES =
+      Stats.exportLong("cron_job_parse_failures");
+  private static final AtomicLong CRON_JOB_COLLISIONS = Stats.exportLong("cron_job_collisions");
+
+  @VisibleForTesting
+  static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cronScheduler");
+
+  private final Storage storage;
+  private final StateManager stateManager;
+  private final CronJobManager cronJobManager;
+  private final BackoffHelper delayedStartBackoff;
+
+  @Inject
+  AuroraCronJob(
+      Config config,
+      Storage storage,
+      StateManager stateManager,
+      CronJobManager cronJobManager) {
+
+    this.storage = checkNotNull(storage);
+    this.stateManager = checkNotNull(stateManager);
+    this.cronJobManager = checkNotNull(cronJobManager);
+    this.delayedStartBackoff = checkNotNull(config.getDelayedStartBackoff());
+  }
+
+  private static final class DeferredLaunch {
+    private final Map<Integer, ITaskConfig> pendingTasks;
+    private final Set<String> activeTaskIds;
+
+    private DeferredLaunch(Map<Integer, ITaskConfig> pendingTasks, Set<String> activeTaskIds) {
+      this.pendingTasks = pendingTasks;
+      this.activeTaskIds = activeTaskIds;
+    }
+  }
+
+  @Override
+  public void execute(JobExecutionContext context) throws JobExecutionException {
+    // We assume quartz prevents concurrent runs of this job for a given job key. This allows us
+    // to avoid races where we might kill another run's tasks.
+    checkState(context.getJobDetail().isConcurrentExectionDisallowed());
+
+    doExecute(Quartz.auroraJobKey(context.getJobDetail().getKey()));
+  }
+
+  @VisibleForTesting
+  void doExecute(final IJobKey key) throws JobExecutionException {
+    final String path = JobKeys.canonicalString(key);
+
+    final Optional<DeferredLaunch> deferredLaunch = storage.write(
+        new Storage.MutateWork.Quiet<Optional<DeferredLaunch>>() {
+          @Override
+          public Optional<DeferredLaunch> apply(Storage.MutableStoreProvider storeProvider) {
+            Optional<IJobConfiguration> config =
+                storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(), key);
+            if (!config.isPresent()) {
+              LOG.warning(String.format(
+                  "Cron was triggered for %s but no job with that key was found in storage.",
+                  path));
+              CRON_JOB_MISFIRES.incrementAndGet();
+              return Optional.absent();
+            }
+
+            SanitizedCronJob cronJob;
+            try {
+              cronJob = SanitizedCronJob.fromUnsanitized(config.get());
+            } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
+              LOG.warning(String.format(
+                  "Invalid cron job for %s in storage - failed to parse with %s", key, e));
+              CRON_JOB_PARSE_FAILURES.incrementAndGet();
+              return Optional.absent();
+            }
+
+            CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
+            LOG.info(String.format(
+                "Cron triggered for %s at %s with policy %s", path, new Date(), collisionPolicy));
+            CRON_JOB_TRIGGERS.incrementAndGet();
+
+            ImmutableMap<Integer, ITaskConfig> pendingTasks =
+                ImmutableMap.copyOf(cronJob.getSanitizedConfig().getTaskConfigs());
+
+            final Query.Builder activeQuery = Query.jobScoped(key).active();
+            Set<String> activeTasks =
+                Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
+
+            if (activeTasks.isEmpty()) {
+              stateManager.insertPendingTasks(pendingTasks);
+              return Optional.absent();
+            }
+
+            CRON_JOB_COLLISIONS.incrementAndGet();
+            switch (collisionPolicy) {
+              case KILL_EXISTING:
+                return Optional.of(new DeferredLaunch(pendingTasks, activeTasks));
+
+              case RUN_OVERLAP:
+                LOG.severe(String.format("Ignoring trigger for job %s with deprecated collision"
+                    + "policy RUN_OVERLAP due to unterminated active tasks.", path));
+                return Optional.absent();
+
+              case CANCEL_NEW:
+                return Optional.absent();
+
+              default:
+                LOG.severe("Unrecognized cron collision policy: " + collisionPolicy);
+                return Optional.absent();
+            }
+          }
+        }
+    );
+
+    if (!deferredLaunch.isPresent()) {
+      return;
+    }
+
+    for (String taskId : deferredLaunch.get().activeTaskIds) {
+      stateManager.changeState(
+          taskId,
+          Optional.<ScheduleStatus>absent(),
+          KILLING,
+          KILL_AUDIT_MESSAGE);
+    }
+    LOG.info(String.format("Waiting for job to terminate before launching cron job %s.", path));
+
+    final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active();
+    try {
+      // NOTE: We block the quartz execution thread here until we've successfully killed our
+      // ancestor. We mitigate this by using a cached thread pool for quartz.
+      delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
+            LOG.info("Initiating delayed launch of cron " + path);
+            stateManager.insertPendingTasks(deferredLaunch.get().pendingTasks);
+            return true;
+          } else {
+            LOG.info("Not yet safe to run cron " + path);
+            return false;
+          }
+        }
+      });
+    } catch (InterruptedException e) {
+      LOG.log(Level.WARNING, "Interrupted while trying to launch cron " + path, e);
+      Thread.currentThread().interrupt();
+      throw new JobExecutionException(e);
+    }
+  }
+
+  static class Config {
+    private final BackoffHelper delayedStartBackoff;
+
+    Config(BackoffHelper delayedStartBackoff) {
+      this.delayedStartBackoff = checkNotNull(delayedStartBackoff);
+    }
+
+    public BackoffHelper getDelayedStartBackoff() {
+      return delayedStartBackoff;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobFactory.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobFactory.java
new file mode 100644
index 0000000..c5268cb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import javax.inject.Inject;
+import javax.inject.Provider;
+
+import org.quartz.Job;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.spi.JobFactory;
+import org.quartz.spi.TriggerFiredBundle;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+* Adapter that allows AuroraCronJobs to be constructed by Guice instead of directly by quartz.
+*/
+class AuroraCronJobFactory implements JobFactory {
+  private final Provider<AuroraCronJob> auroraCronJobProvider;
+
+  @Inject
+  AuroraCronJobFactory(Provider<AuroraCronJob> auroraCronJobProvider) {
+    this.auroraCronJobProvider = checkNotNull(auroraCronJobProvider);
+  }
+
+  @Override
+  public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
+    checkState(AuroraCronJob.class.equals(bundle.getJobDetail().getJobClass()),
+        "Quartz tried to run a type of job we don't know about: "
+            + bundle.getJobDetail().getJobClass());
+
+    return auroraCronJobProvider.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
new file mode 100644
index 0000000..8a5f569
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
@@ -0,0 +1,256 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.storage.JobStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.impl.matchers.GroupMatcher;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * NOTE: The source of truth for whether a cron job exists or not is always the JobStore. If state
+ * somehow becomes inconsistent (i.e. a job key is scheduled for execution but its underlying
+ * JobConfiguration does not exist in storage the execution of the job will log a warning and
+ * exit).
+ */
+class CronJobManagerImpl implements CronJobManager {
+  private static final Logger LOG = Logger.getLogger(CronJobManagerImpl.class.getName());
+
+  private final Storage storage;
+  private final Scheduler scheduler;
+  private final TimeZone timeZone;
+
+  @Inject
+  CronJobManagerImpl(Storage storage, Scheduler scheduler, TimeZone timeZone) {
+    this.storage = checkNotNull(storage);
+    this.scheduler = checkNotNull(scheduler);
+    this.timeZone = checkNotNull(timeZone);
+  }
+
+  @Override
+  public String getManagerKey() {
+    return "CRON";
+  }
+
+  @Override
+  public void startJobNow(final IJobKey jobKey) throws CronException {
+    checkNotNull(jobKey);
+
+    storage.weaklyConsistentRead(new Work<Void, CronException>() {
+      @Override
+      public Void apply(Storage.StoreProvider storeProvider) throws CronException {
+        checkCronExists(jobKey, storeProvider.getJobStore());
+        triggerJob(jobKey);
+        return null;
+      }
+    });
+  }
+
+  private void triggerJob(IJobKey jobKey) throws CronException {
+    try {
+      scheduler.triggerJob(Quartz.jobKey(jobKey));
+    } catch (SchedulerException e) {
+      throw new CronException(e);
+    }
+    LOG.info(String.format("Triggered cron job for %s.", JobKeys.canonicalString(jobKey)));
+  }
+
+  private static void checkNoRunOverlap(SanitizedCronJob cronJob) throws CronException {
+    // NOTE: We check at create and update instead of in SanitizedCronJob to allow existing jobs
+    // but reject new ones.
+    if (CronCollisionPolicy.RUN_OVERLAP.equals(cronJob.getCronCollisionPolicy())) {
+      throw new CronException(
+          "The RUN_OVERLAP collision policy has been removed (AURORA-38).");
+    }
+  }
+
+  @Override
+  public void updateJob(final SanitizedCronJob config) throws CronException {
+    checkNotNull(config);
+    checkNoRunOverlap(config);
+
+    final IJobKey jobKey = config.getSanitizedConfig().getJobConfig().getKey();
+    storage.write(new MutateWork.NoResult<CronException>() {
+      @Override
+      public void execute(Storage.MutableStoreProvider storeProvider) throws CronException {
+        checkCronExists(jobKey, storeProvider.getJobStore());
+
+        removeJob(jobKey, storeProvider.getJobStore());
+        descheduleJob(jobKey);
+        saveJob(config, storeProvider.getJobStore());
+        scheduleJob(config.getCrontabEntry(), jobKey);
+      }
+    });
+  }
+
+  @Override
+  public void createJob(final SanitizedCronJob cronJob) throws CronException {
+    checkNotNull(cronJob);
+    checkNoRunOverlap(cronJob);
+
+    final IJobKey jobKey = cronJob.getSanitizedConfig().getJobConfig().getKey();
+    storage.write(new MutateWork.NoResult<CronException>() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) throws CronException {
+        checkNotExists(jobKey, storeProvider.getJobStore());
+
+        saveJob(cronJob, storeProvider.getJobStore());
+        scheduleJob(cronJob.getCrontabEntry(), jobKey);
+      }
+    });
+  }
+
+  private void checkNotExists(IJobKey jobKey, JobStore jobStore) throws CronException {
+    if (jobStore.fetchJob(getManagerKey(), jobKey).isPresent()) {
+      throw new CronException(
+          String.format("Job already exists for %s.", JobKeys.canonicalString(jobKey)));
+    }
+  }
+
+  private void checkCronExists(IJobKey jobKey, JobStore jobStore) throws CronException {
+    if (!jobStore.fetchJob(getManagerKey(), jobKey).isPresent()) {
+      throw new CronException(
+          String.format("No cron template found for %s.", JobKeys.canonicalString(jobKey)));
+    }
+  }
+
+  private void removeJob(IJobKey jobKey, JobStore.Mutable jobStore) {
+    jobStore.removeJob(jobKey);
+    LOG.info(
+        String.format("Deleted cron job %s from storage.", JobKeys.canonicalString(jobKey)));
+  }
+
+  private void saveJob(SanitizedCronJob cronJob, JobStore.Mutable jobStore) {
+    IJobConfiguration config = cronJob.getSanitizedConfig().getJobConfig();
+
+    jobStore.saveAcceptedJob(getManagerKey(), config);
+    LOG.info(String.format(
+        "Saved new cron job %s to storage.", JobKeys.canonicalString(config.getKey())));
+  }
+
+  // TODO(ksweeney): Consider exposing this in the interface and making caller responsible.
+  void scheduleJob(CrontabEntry crontabEntry, IJobKey jobKey) throws CronException {
+    try {
+      scheduler.scheduleJob(
+          Quartz.jobDetail(jobKey, AuroraCronJob.class),
+          Quartz.cronTrigger(crontabEntry, timeZone));
+    } catch (SchedulerException e) {
+      throw new CronException(e);
+    }
+    LOG.info(String.format(
+        "Scheduled job %s with schedule %s.", JobKeys.canonicalString(jobKey), crontabEntry));
+  }
+
+  @Override
+  public Iterable<IJobConfiguration> getJobs() {
+    // NOTE: no synchronization is needed here since we don't touch internal quartz state.
+    return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
+      @Override
+      public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
+        return storeProvider.getJobStore().fetchJobs(getManagerKey());
+      }
+    });
+  }
+
+  @Override
+  public boolean hasJob(final IJobKey jobKey) {
+    checkNotNull(jobKey);
+
+    return storage.consistentRead(new Work.Quiet<Boolean>() {
+      @Override
+      public Boolean apply(Storage.StoreProvider storeProvider) {
+        return storeProvider.getJobStore().fetchJob(getManagerKey(), jobKey).isPresent();
+      }
+    });
+  }
+
+  @Override
+  public boolean deleteJob(final IJobKey jobKey) {
+    checkNotNull(jobKey);
+
+    return storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override
+      public Boolean apply(Storage.MutableStoreProvider storeProvider) {
+        if (!hasJob(jobKey)) {
+          return false;
+        }
+
+        removeJob(jobKey, storeProvider.getJobStore());
+        descheduleJob(jobKey);
+        return true;
+      }
+    });
+  }
+
+  private void descheduleJob(IJobKey jobKey) {
+    String path = JobKeys.canonicalString(jobKey);
+    try {
+      // TODO(ksweeney): Consider interrupting the running job here.
+      // There's a race here where an old running job could fail to find the old config. That's
+      // fine given that the behavior of AuroraCronJob is to log an error and exit if it's unable
+      // to find a job for its key.
+      scheduler.deleteJob(Quartz.jobKey(jobKey));
+      LOG.info("Successfully descheduled " + path + ".");
+    } catch (SchedulerException e) {
+      LOG.log(Level.WARNING, "Error when attempting to deschedule " + path + ": " + e, e);
+    }
+  }
+
+  @Override
+  public Map<IJobKey, CrontabEntry> getScheduledJobs() {
+    // NOTE: no synchronization is needed here since this is just a dump of internal quartz state
+    // for debugging.
+    ImmutableMap.Builder<IJobKey, CrontabEntry> scheduledJobs = ImmutableMap.builder();
+    try {
+      for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.<JobKey>anyGroup())) {
+        Optional<JobDetail> jobDetail = Optional.fromNullable(scheduler.getJobDetail(jobKey));
+        if (jobDetail.isPresent()) {
+          scheduledJobs.put(
+              Quartz.auroraJobKey(jobKey), CrontabEntry.parse(jobDetail.get().getDescription()));
+        }
+      }
+    } catch (SchedulerException e) {
+      throw Throwables.propagate(e);
+    }
+    return scheduledJobs.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
new file mode 100644
index 0000000..64fa068
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
@@ -0,0 +1,114 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Command;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.quartz.Scheduler;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Manager for startup and teardown of Quartz scheduler.
+ */
+class CronLifecycle extends AbstractIdleService implements PubsubEvent.EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(CronLifecycle.class.getName());
+
+  private static final AtomicInteger RUNNING_FLAG = Stats.exportInt("quartz_scheduler_running");
+  private static final AtomicInteger LOADED_FLAG = Stats.exportInt("cron_jobs_loaded");
+  private static final AtomicLong LAUNCH_FAILURES = Stats.exportLong("cron_job_launch_failures");
+
+  private final Scheduler scheduler;
+  private final ShutdownRegistry shutdownRegistry;
+  private final CronJobManagerImpl cronJobManager;
+
+  @Inject
+  CronLifecycle(
+      Scheduler scheduler,
+      ShutdownRegistry shutdownRegistry,
+      CronJobManagerImpl cronJobManager) {
+
+    this.scheduler = checkNotNull(scheduler);
+    this.shutdownRegistry = checkNotNull(shutdownRegistry);
+    this.cronJobManager = checkNotNull(cronJobManager);
+  }
+
+  /**
+   * Notifies the cronScheduler job manager that the scheduler is active, and job configurations
+   * are ready to load.
+   *
+   * @param schedulerActive Event.
+   */
+  @Subscribe
+  public void schedulerActive(PubsubEvent.SchedulerActive schedulerActive) {
+    startAsync();
+    shutdownRegistry.addAction(new Command() {
+      @Override
+      public void execute() {
+        CronLifecycle.this.stopAsync().awaitTerminated();
+      }
+    });
+    awaitRunning();
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting Quartz cron scheduler" + scheduler.getSchedulerName() + ".");
+    scheduler.start();
+    RUNNING_FLAG.set(1);
+
+    // TODO(ksweeney): Refactor the interface - we really only need the job keys here.
+    for (IJobConfiguration job : cronJobManager.getJobs()) {
+      try {
+        SanitizedCronJob cronJob = SanitizedCronJob.fromUnsanitized(job);
+        cronJobManager.scheduleJob(
+            cronJob.getCrontabEntry(),
+            cronJob.getSanitizedConfig().getJobConfig().getKey());
+      } catch (CronException | ConfigurationManager.TaskDescriptionException e) {
+        logLaunchFailure(job, e);
+      }
+    }
+    LOADED_FLAG.set(1);
+  }
+
+  private void logLaunchFailure(IJobConfiguration job, Exception e) {
+    LAUNCH_FAILURES.incrementAndGet();
+    LOG.log(Level.SEVERE, "Scheduling failed for recovered job " + job, e);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Shutting down Quartz cron scheduler.");
+    scheduler.shutdown();
+    RUNNING_FLAG.set(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
new file mode 100644
index 0000000..6934828
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Throwables;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffHelper;
+
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.impl.DirectSchedulerFactory;
+import org.quartz.simpl.RAMJobStore;
+import org.quartz.simpl.SimpleThreadPool;
+
+/**
+ * Provides a {@link CronJobManager} with a Quartz backend. While Quartz itself supports
+ * persistence, the scheduler exposed by this module does not persist any state - it simply
+ * creates tasks from a {@link org.apache.aurora.gen.JobConfiguration} template on a cron-style
+ * schedule.
+ */
+public class CronModule extends AbstractModule {
+  private static final Logger LOG = Logger.getLogger(CronModule.class.getName());
+
+  @CmdLine(name = "cron_scheduler_num_threads",
+      help = "Number of threads to use for the cron scheduler thread pool.")
+  private static final Arg<Integer> NUM_THREADS = Arg.create(100);
+
+  @CmdLine(name = "cron_timezone", help = "TimeZone to use for cron predictions.")
+  private static final Arg<String> CRON_TIMEZONE = Arg.create("GMT");
+
+  @CmdLine(name = "cron_start_initial_backoff", help =
+      "Initial backoff delay while waiting for a previous cron run to be killed.")
+  public static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
+      Arg.create(Amount.of(1L, Time.SECONDS));
+
+  @CmdLine(name = "cron_start_max_backoff", help =
+      "Max backoff delay while waiting for a previous cron run to be killed.")
+  public static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  // Global per-JVM ID number generator for the provided Quartz Scheduler.
+  private static final AtomicLong ID_GENERATOR = new AtomicLong();
+
+  @Override
+  protected void configure() {
+    bind(CronPredictor.class).to(CronPredictorImpl.class);
+    bind(CronPredictorImpl.class).in(Singleton.class);
+
+    bind(CronJobManager.class).to(CronJobManagerImpl.class);
+    bind(CronJobManagerImpl.class).in(Singleton.class);
+
+    bind(CronScheduler.class).to(CronSchedulerImpl.class);
+    bind(CronSchedulerImpl.class).in(Singleton.class);
+
+    bind(AuroraCronJobFactory.class).in(Singleton.class);
+
+    bind(AuroraCronJob.class).in(Singleton.class);
+    bind(AuroraCronJob.Config.class).toInstance(new AuroraCronJob.Config(
+        new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get())));
+
+    bind(CronLifecycle.class).in(Singleton.class);
+    PubsubEventModule.bindSubscriber(binder(), CronLifecycle.class);
+  }
+
+  @Provides
+  private TimeZone provideTimeZone() {
+    TimeZone timeZone = TimeZone.getTimeZone(CRON_TIMEZONE.get());
+    TimeZone systemTimeZone = TimeZone.getDefault();
+    if (!timeZone.equals(systemTimeZone)) {
+      LOG.warning("Cron schedules are configured to fire according to timezone "
+          + timeZone.getDisplayName()
+          + " but system timezone is set to "
+          + systemTimeZone.getDisplayName());
+    }
+    return timeZone;
+  }
+
+  /*
+   * NOTE: Quartz implements DirectSchedulerFactory as a mutable global singleton in a static
+   * variable. While the Scheduler instances it produces are independent we synchronize here to
+   * avoid an initialization race across injectors. In practice this only shows up during testing;
+   * production Aurora instances will only have one object graph at a time.
+   */
+  @Provides
+  @Singleton
+  private static synchronized Scheduler provideScheduler(AuroraCronJobFactory jobFactory) {
+    SimpleThreadPool threadPool = new SimpleThreadPool(NUM_THREADS.get(), Thread.NORM_PRIORITY);
+    threadPool.setMakeThreadsDaemons(true);
+
+    DirectSchedulerFactory schedulerFactory = DirectSchedulerFactory.getInstance();
+    String schedulerName = "aurora-cron-" + ID_GENERATOR.incrementAndGet();
+    try {
+      schedulerFactory.createScheduler(schedulerName, schedulerName, threadPool, new RAMJobStore());
+      Scheduler scheduler = schedulerFactory.getScheduler(schedulerName);
+      scheduler.setJobFactory(jobFactory);
+      return scheduler;
+    } catch (SchedulerException e) {
+      LOG.severe("Error initializing Quartz cron scheduler: " + e);
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImpl.java
new file mode 100644
index 0000000..fb24c28
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImpl.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Date;
+import java.util.TimeZone;
+
+import javax.inject.Inject;
+
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.quartz.CronExpression;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+class CronPredictorImpl implements CronPredictor {
+  private final Clock clock;
+  private final TimeZone timeZone;
+
+  @Inject
+  CronPredictorImpl(Clock clock, TimeZone timeZone) {
+    this.clock = checkNotNull(clock);
+    this.timeZone = checkNotNull(timeZone);
+  }
+
+  @Override
+  public Date predictNextRun(CrontabEntry schedule) {
+    CronExpression cronExpression = Quartz.cronExpression(schedule, timeZone);
+    return cronExpression.getNextValidTimeAfter(new Date(clock.nowMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java
new file mode 100644
index 0000000..3bef22d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.twitter.common.base.Function;
+
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.quartz.CronTrigger;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.scheduler.cron.quartz.Quartz.jobKey;
+
+class CronSchedulerImpl implements CronScheduler {
+  private static final Logger LOG = Logger.getLogger(CronSchedulerImpl.class.getName());
+
+  private final Scheduler scheduler;
+
+  @Inject
+  CronSchedulerImpl(Scheduler scheduler) {
+    this.scheduler = checkNotNull(scheduler);
+  }
+
+  @Override
+  public Optional<CrontabEntry> getSchedule(IJobKey jobKey) throws IllegalStateException {
+    checkNotNull(jobKey);
+
+    try {
+      return Optional.of(Iterables.getOnlyElement(
+          FluentIterable.from(scheduler.getTriggersOfJob(jobKey(jobKey)))
+              .filter(CronTrigger.class)
+              .transform(new Function<CronTrigger, CrontabEntry>() {
+                @Override
+                public CrontabEntry apply(CronTrigger trigger) {
+                  return Quartz.crontabEntry(trigger);
+                }
+              })));
+    } catch (SchedulerException e) {
+      LOG.log(Level.SEVERE,
+          "Error reading job " + JobKeys.canonicalString(jobKey) + " cronExpression Quartz: " + e,
+          e);
+      return Optional.absent();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/Quartz.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/Quartz.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/Quartz.java
new file mode 100644
index 0000000..63aaade
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/Quartz.java
@@ -0,0 +1,124 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.TimeZone;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.TriggerBuilder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Utilities for converting Aurora datatypes to Quartz datatypes.
+ */
+final class Quartz {
+  private Quartz() {
+    // Utility class.
+  }
+
+  /**
+   * Convert an Aurora CrontabEntry to a Quartz CronExpression.
+   */
+  static CronExpression cronExpression(CrontabEntry entry, TimeZone timeZone) {
+    String dayOfMonth;
+    if (entry.hasWildcardDayOfMonth()) {
+      dayOfMonth = "?"; // special quartz token meaning "don't care"
+    } else {
+      dayOfMonth = entry.getDayOfMonthAsString();
+    }
+    String dayOfWeek;
+    if (entry.hasWildcardDayOfWeek() && !entry.hasWildcardDayOfMonth()) {
+      dayOfWeek = "?";
+    } else {
+      List<Integer> daysOfWeek = Lists.newArrayList();
+      for (Range<Integer> range : entry.getDayOfWeek().asRanges()) {
+        for (int i : ContiguousSet.create(range, DiscreteDomain.integers())) {
+          daysOfWeek.add(i + 1); // Quartz has an off-by-one with what the "standard" defines.
+        }
+      }
+      dayOfWeek = Joiner.on(",").join(daysOfWeek);
+    }
+
+    String rawCronExpresion = Joiner.on(" ").join(
+        "0",
+        entry.getMinuteAsString(),
+        entry.getHourAsString(),
+        dayOfMonth,
+        entry.getMonthAsString(),
+        dayOfWeek);
+    CronExpression cronExpression;
+    try {
+      cronExpression = new CronExpression(rawCronExpresion);
+    } catch (ParseException e) {
+      throw Throwables.propagate(e);
+    }
+    cronExpression.setTimeZone(timeZone);
+    return cronExpression;
+  }
+
+  /**
+   * Convert a Quartz JobKey to an Aurora IJobKey.
+   */
+  static IJobKey auroraJobKey(org.quartz.JobKey jobKey) {
+    return JobKeys.parse(jobKey.getName());
+  }
+
+  /**
+   * Convert an Aurora IJobKey to a Quartz JobKey.
+   */
+  static JobKey jobKey(IJobKey jobKey) {
+    return JobKey.jobKey(JobKeys.canonicalString(jobKey));
+  }
+
+  static CronTrigger cronTrigger(CrontabEntry schedule, TimeZone timeZone) {
+    return TriggerBuilder.newTrigger()
+        .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression(schedule, timeZone)))
+        .withDescription(schedule.toString())
+        .build();
+  }
+
+  static JobDetail jobDetail(IJobKey jobKey, Class<? extends Job> jobClass) {
+    checkNotNull(jobKey);
+    checkNotNull(jobClass);
+
+    return JobBuilder.newJob(jobClass)
+        .withIdentity(jobKey(jobKey))
+        .build();
+  }
+
+  static CrontabEntry crontabEntry(CronTrigger cronTrigger) {
+    return CrontabEntry.parse(cronTrigger.getDescription());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java b/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
deleted file mode 100644
index 61b01d2..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.testing;
-
-import java.io.InputStreamReader;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Abstract test to verify conformance with the {@link CronScheduler} interface.
- */
-public abstract class AbstractCronIT {
-  private static final String WILDCARD_SCHEDULE = "* * * * *";
-
-  /**
-   * Child should return an instance of the {@link CronScheduler} test under test here.
-   */
-  protected abstract CronScheduler makeCronScheduler() throws Exception;
-
-  /**
-   * Child should return an instance of the {@link CronPredictor} under test here.
-   *
-   * @param clock The clock the predictor should use.
-   */
-  protected abstract CronPredictor makeCronPredictor(Clock clock) throws Exception;
-
-  @Test
-  public void testCronSchedulerLifecycle() throws Exception {
-    CronScheduler scheduler = makeCronScheduler();
-
-    scheduler.startAsync().awaitRunning();
-    final CountDownLatch cronRan = new CountDownLatch(1);
-    scheduler.schedule(WILDCARD_SCHEDULE, new Runnable() {
-      @Override public void run() {
-        cronRan.countDown();
-      }
-    });
-    cronRan.await();
-    scheduler.stopAsync().awaitTerminated();
-  }
-
-  @Test
-  public void testCronPredictorConforms() throws Exception {
-    FakeClock clock = new FakeClock();
-    CronPredictor cronPredictor = makeCronPredictor(clock);
-
-    for (TriggerPrediction triggerPrediction : getExpectedTriggerPredictions()) {
-      List<Long> results = Lists.newArrayList();
-      clock.setNowMillis(0);
-      for (int i = 0; i < triggerPrediction.getTriggerTimes().size(); i++) {
-        Date nextTriggerTime = cronPredictor.predictNextRun(triggerPrediction.getSchedule());
-        results.add(nextTriggerTime.getTime());
-        clock.setNowMillis(nextTriggerTime.getTime());
-      }
-      assertEquals("Cron schedule "
-          + triggerPrediction.getSchedule()
-          + " should have have predicted trigger times "
-          + triggerPrediction.getTriggerTimes()
-          + " but predicted "
-          + results
-          + " instead.", triggerPrediction.getTriggerTimes(), results);
-    }
-  }
-
-  @Test
-  public void testCronScheduleValidatorAcceptsValidSchedules() throws Exception {
-    CronScheduler cron = makeCronScheduler();
-
-    for (TriggerPrediction triggerPrediction : getExpectedTriggerPredictions()) {
-      assertTrue("Cron schedule " + triggerPrediction.getSchedule() + " should pass validation.",
-          cron.isValidSchedule(triggerPrediction.getSchedule()));
-    }
-  }
-
-  private static List<TriggerPrediction> getExpectedTriggerPredictions() {
-    return new Gson()
-        .fromJson(
-            new InputStreamReader(
-                AbstractCronIT.class.getResourceAsStream("cron-schedule-predictions.json")),
-            new TypeToken<List<TriggerPrediction>>() { }.getType());
-  }
-
-  /**
-   * A schedule and the expected iteratively-applied prediction results.
-   */
-  public static class TriggerPrediction {
-    private String schedule;
-    private List<Long> triggerTimes;
-
-    private TriggerPrediction() {
-      // GSON constructor.
-    }
-
-    public TriggerPrediction(String schedule, List<Long> triggerTimes) {
-      this.schedule = schedule;
-      this.triggerTimes = triggerTimes;
-    }
-
-    public String getSchedule() {
-      return schedule;
-    }
-
-    public List<Long> getTriggerTimes() {
-      return ImmutableList.copyOf(triggerTimes);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/http/Cron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Cron.java b/src/main/java/org/apache/aurora/scheduler/http/Cron.java
index 80a398a..d8c44f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Cron.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Cron.java
@@ -26,7 +26,7 @@ import javax.ws.rs.core.Response;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.aurora.scheduler.state.CronJobManager;
+import org.apache.aurora.scheduler.cron.CronJobManager;
 
 /**
  * HTTP interface to dump state of the internal cron scheduler.
@@ -50,7 +50,6 @@ public class Cron {
   public Response dumpContents() {
     Map<String, Object> response = ImmutableMap.<String, Object>builder()
         .put("scheduled", cronManager.getScheduledJobs())
-        .put("pending", cronManager.getPendingRuns())
         .build();
 
    return Response.ok(response).build();