You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/05/15 04:34:44 UTC

[4/5] CronScheduler based on Quartz

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
index 9831012..6d76f60 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
@@ -39,8 +39,8 @@ import com.twitter.common.net.pool.DynamicHostSet;
 import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
 import com.twitter.thrift.ServiceInstance;
 
+import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
index efea75f..823668f 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
@@ -25,7 +25,6 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.twitter.common.base.Closure;
 import com.twitter.common.thrift.Util;
@@ -34,7 +33,7 @@ import org.antlr.stringtemplate.StringTemplate;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.state.CronJobManager;
+import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work;
@@ -43,6 +42,8 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.thrift.TBase;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Servlet that prints out the raw configuration for a specified struct.
  */
@@ -50,11 +51,13 @@ import org.apache.thrift.TBase;
 public class StructDump extends JerseyTemplateServlet {
 
   private final Storage storage;
+  private final CronJobManager cronJobManager;
 
   @Inject
-  public StructDump(Storage storage) {
+  public StructDump(Storage storage, CronJobManager cronJobManager) {
     super("structdump");
-    this.storage = Preconditions.checkNotNull(storage);
+    this.storage = checkNotNull(storage);
+    this.cronJobManager = checkNotNull(cronJobManager);
   }
 
   private static final String USAGE =
@@ -106,11 +109,11 @@ public class StructDump extends JerseyTemplateServlet {
       @PathParam("job") final String job) {
 
     final IJobKey jobKey = JobKeys.from(role, environment, job);
-    return dumpEntity("Cron job " + JobKeys.toPath(jobKey),
+    return dumpEntity("Cron job " + JobKeys.canonicalString(jobKey),
         new Work.Quiet<Optional<? extends TBase<?, ?>>>() {
           @Override
           public Optional<JobConfiguration> apply(StoreProvider storeProvider) {
-            return storeProvider.getJobStore().fetchJob(CronJobManager.MANAGER_KEY, jobKey)
+            return storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(), jobKey)
                 .transform(IJobConfiguration.TO_BUILDER);
           }
         });

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
index ff7e3cb..46be612 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
@@ -119,7 +119,7 @@ interface SlaGroup {
       return Multimaps.index(tasks, Functions.compose(new Function<IJobKey, String>() {
         @Override
         public String apply(IJobKey jobKey) {
-          return "sla_" + JobKeys.toPath(jobKey) + "_";
+          return "sla_" + JobKeys.canonicalString(jobKey) + "_";
         }
       }, Tasks.SCHEDULED_TO_JOB_KEY));
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
deleted file mode 100644
index 4bd190c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Collections;
-import java.util.Date;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Supplier;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.BackoffHelper;
-
-import org.apache.aurora.gen.CronCollisionPolicy;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.commons.lang.StringUtils;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-
-/**
- * A job scheduler that receives jobs that should be run periodically on a cron schedule.
- */
-public class CronJobManager implements EventSubscriber {
-
-  public static final String MANAGER_KEY = "CRON";
-
-  @VisibleForTesting
-  static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cron");
-
-  private static final Logger LOG = Logger.getLogger(CronJobManager.class.getName());
-
-  @CmdLine(name = "cron_start_initial_backoff", help =
-      "Initial backoff delay while waiting for a previous cron run to start.")
-  private static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
-      Arg.create(Amount.of(1L, Time.SECONDS));
-
-  @CmdLine(name = "cron_start_max_backoff", help =
-      "Max backoff delay while waiting for a previous cron run to start.")
-  private static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
-      Arg.create(Amount.of(1L, Time.MINUTES));
-
-  private final AtomicLong cronJobsTriggered = Stats.exportLong("cron_jobs_triggered");
-  private final AtomicLong cronJobLaunchFailures = Stats.exportLong("cron_job_launch_failures");
-
-  // Maps from the unique job identifier to the unique identifier used internally by the cron
-  // scheduler.
-  private final Map<IJobKey, String> scheduledJobs =
-      Collections.synchronizedMap(Maps.<IJobKey, String>newHashMap());
-
-  // Prevents runs from dogpiling while waiting for a run to transition out of the KILLING state.
-  // This is necessary because killing a job (if dictated by cron collision policy) is an
-  // asynchronous operation.
-  private final Map<IJobKey, SanitizedConfiguration> pendingRuns =
-      Collections.synchronizedMap(Maps.<IJobKey, SanitizedConfiguration>newHashMap());
-
-  private final StateManager stateManager;
-  private final Storage storage;
-  private final CronScheduler cron;
-  private final ShutdownRegistry shutdownRegistry;
-  private final BackoffHelper delayedStartBackoff;
-  private final Executor delayedRunExecutor;
-
-  @Inject
-  CronJobManager(
-      StateManager stateManager,
-      Storage storage,
-      CronScheduler cron,
-      ShutdownRegistry shutdownRegistry) {
-
-    this(
-        stateManager,
-        storage,
-        cron,
-        shutdownRegistry,
-        Executors.newCachedThreadPool(
-            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CronDelay-%d").build()));
-  }
-
-  @VisibleForTesting
-  CronJobManager(
-      StateManager stateManager,
-      Storage storage,
-      CronScheduler cron,
-      ShutdownRegistry shutdownRegistry,
-      Executor delayedRunExecutor) {
-
-    this.stateManager = checkNotNull(stateManager);
-    this.storage = checkNotNull(storage);
-    this.cron = checkNotNull(cron);
-    this.shutdownRegistry = checkNotNull(shutdownRegistry);
-    this.delayedStartBackoff =
-        new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get());
-    this.delayedRunExecutor = checkNotNull(delayedRunExecutor);
-
-    Stats.exportSize("cron_num_pending_runs", pendingRuns);
-  }
-
-  private void mapScheduledJob(SanitizedCronJob cronJob) throws ScheduleException {
-    IJobKey jobKey = cronJob.config.getJobConfig().getKey();
-    synchronized (scheduledJobs) {
-      Preconditions.checkState(
-          !scheduledJobs.containsKey(jobKey),
-          "Illegal state - cron schedule already exists for " + JobKeys.toPath(jobKey));
-      scheduledJobs.put(jobKey, scheduleJob(cronJob));
-    }
-  }
-
-  /**
-   * Notifies the cron job manager that the scheduler is active, and job configurations are ready to
-   * load.
-   *
-   * @param schedulerActive Event.
-   */
-  @Subscribe
-  public void schedulerActive(SchedulerActive schedulerActive) {
-    cron.startAsync().awaitRunning();
-    shutdownRegistry.addAction(new Command() {
-      @Override
-      public void execute() {
-        // NOTE: We don't know ahead-of-time which thread will execute the shutdown command,
-        // so we shouldn't block here.
-        cron.stopAsync();
-      }
-    });
-
-    Iterable<IJobConfiguration> crons =
-        storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
-          @Override
-          public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
-            return storeProvider.getJobStore().fetchJobs(MANAGER_KEY);
-          }
-        });
-
-    for (IJobConfiguration job : crons) {
-      try {
-        mapScheduledJob(new SanitizedCronJob(job, cron));
-      } catch (ScheduleException | TaskDescriptionException e) {
-        logLaunchFailure(job, e);
-      }
-    }
-  }
-
-  private void logLaunchFailure(IJobConfiguration job, Exception e) {
-    cronJobLaunchFailures.incrementAndGet();
-    LOG.log(Level.SEVERE, "Scheduling failed for recovered job " + job, e);
-  }
-
-  /**
-   * Triggers execution of a job.
-   *
-   * @param jobKey Key of the job to start.
-   * @throws ScheduleException If the job could not be started with the cron system.
-   * @throws TaskDescriptionException If the stored job associated with {@code jobKey} has field
-   *         validation problems.
-   */
-  public void startJobNow(IJobKey jobKey) throws TaskDescriptionException, ScheduleException {
-    Optional<IJobConfiguration> jobConfig = fetchJob(jobKey);
-    if (!jobConfig.isPresent()) {
-      throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
-    }
-
-    cronTriggered(new SanitizedCronJob(jobConfig.get(), cron));
-  }
-
-  private void delayedRun(final Query.Builder query, final SanitizedConfiguration config) {
-    IJobConfiguration job = config.getJobConfig();
-    final String jobPath = JobKeys.toPath(job);
-    final IJobKey jobKey = job.getKey();
-    LOG.info("Waiting for job to terminate before launching cron job " + jobPath);
-    if (pendingRuns.put(jobKey, config) == null) {
-      LOG.info("Launching a task to wait for job to finish: " + jobPath);
-      // There was no run already pending for this job, launch a task to delay launch until the
-      // existing run has terminated.
-      delayedRunExecutor.execute(new Runnable() {
-        @Override
-        public void run() {
-          runWhenTerminated(query, jobKey);
-        }
-      });
-    }
-  }
-
-  private void runWhenTerminated(final Query.Builder query, final IJobKey jobKey) {
-    try {
-      delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
-            LOG.info("Initiating delayed launch of cron " + jobKey);
-            SanitizedConfiguration config = pendingRuns.remove(jobKey);
-            checkNotNull(config, "Failed to fetch job for delayed run of " + jobKey);
-            LOG.info("Launching " + config.getTaskConfigs().size() + " tasks.");
-            stateManager.insertPendingTasks(config.getTaskConfigs());
-            return true;
-          } else {
-            LOG.info("Not yet safe to run cron " + jobKey);
-            return false;
-          }
-        }
-      });
-    } catch (InterruptedException e) {
-      LOG.log(Level.WARNING, "Interrupted while trying to launch cron " + jobKey, e);
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  private void killActiveTasks(Set<String> taskIds) {
-    if (taskIds.isEmpty()) {
-      return;
-    }
-
-    for (String taskId : taskIds) {
-      stateManager.changeState(
-          taskId,
-          Optional.<ScheduleStatus>absent(),
-          KILLING,
-          KILL_AUDIT_MESSAGE);
-    }
-  }
-
-  public static CronCollisionPolicy orDefault(@Nullable CronCollisionPolicy policy) {
-    return Optional.fromNullable(policy).or(CronCollisionPolicy.KILL_EXISTING);
-  }
-
-  /**
-   * Triggers execution of a cron job, depending on the cron collision policy for the job.
-   *
-   * @param cronJob The job to be triggered.
-   */
-  private void cronTriggered(SanitizedCronJob cronJob) {
-    final SanitizedConfiguration config = cronJob.config;
-    final IJobConfiguration job = config.getJobConfig();
-    LOG.info(String.format("Cron triggered for %s at %s with policy %s",
-        JobKeys.toPath(job), new Date(), job.getCronCollisionPolicy()));
-    cronJobsTriggered.incrementAndGet();
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
-        ImmutableMap.Builder<Integer, ITaskConfig> builder = ImmutableMap.builder();
-        Query.Builder activeQuery = Query.jobScoped(job.getKey()).active();
-        Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
-
-        if (activeTasks.isEmpty()) {
-          builder.putAll(config.getTaskConfigs());
-        } else {
-          switch (orDefault(job.getCronCollisionPolicy())) {
-            case KILL_EXISTING:
-              killActiveTasks(activeTasks);
-              delayedRun(activeQuery, config);
-              break;
-
-            case CANCEL_NEW:
-              break;
-
-            case RUN_OVERLAP:
-              LOG.severe(String.format(
-                  "Ignoring trigger for job %s with deprecated collision"
-                  + "policy RUN_OVERLAP due to unterminated active tasks.",
-                  JobKeys.toPath(job)));
-              break;
-
-            default:
-              LOG.severe("Unrecognized cron collision policy: " + job.getCronCollisionPolicy());
-          }
-        }
-
-        Map<Integer, ITaskConfig> newTasks = builder.build();
-        if (!newTasks.isEmpty()) {
-          stateManager.insertPendingTasks(newTasks);
-        }
-      }
-    });
-  }
-
-  /**
-   * Updates (re-schedules) the existing cron job.
-   *
-   * @param config New job configuration to update to.
-   * @throws ScheduleException If non-cron job confuration provided.
-   */
-  public void updateJob(SanitizedConfiguration config) throws ScheduleException {
-    IJobConfiguration job = config.getJobConfig();
-    if (!hasCronSchedule(job)) {
-      throw new ScheduleException("A cron job may not be updated to a non-cron job.");
-    }
-    String key = scheduledJobs.remove(job.getKey());
-    if (key == null) {
-      throw new ScheduleException(
-          "No cron template found for the given key: " + JobKeys.toPath(job));
-    }
-    cron.deschedule(key);
-    checkArgument(receiveJob(config));
-  }
-
-  private static boolean hasCronSchedule(IJobConfiguration job) {
-    checkNotNull(job);
-    return !StringUtils.isEmpty(job.getCronSchedule());
-  }
-
-  public boolean receiveJob(SanitizedConfiguration config) throws ScheduleException {
-    final IJobConfiguration job = config.getJobConfig();
-    if (!hasCronSchedule(job)) {
-      return false;
-    }
-
-    if (CronCollisionPolicy.RUN_OVERLAP.equals(job.getCronCollisionPolicy())) {
-      throw new ScheduleException(
-          "The RUN_OVERLAP collision policy has been removed (AURORA-38).");
-    }
-
-    SanitizedCronJob cronJob = new SanitizedCronJob(config, cron);
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider) {
-        storeProvider.getJobStore().saveAcceptedJob(MANAGER_KEY, job);
-      }
-    });
-    mapScheduledJob(cronJob);
-
-    return true;
-  }
-
-  private String scheduleJob(final SanitizedCronJob cronJob) throws ScheduleException {
-    IJobConfiguration job = cronJob.config.getJobConfig();
-    final String jobPath = JobKeys.toPath(job);
-    LOG.info(String.format("Scheduling cron job %s: %s", jobPath, job.getCronSchedule()));
-    try {
-      return cron.schedule(job.getCronSchedule(), new Runnable() {
-        @Override
-        public void run() {
-          // TODO(William Farner): May want to record information about job runs.
-          LOG.info("Running cron job: " + jobPath);
-          cronTriggered(cronJob);
-        }
-      });
-    } catch (CronException e) {
-      throw new ScheduleException("Failed to schedule cron job: " + e.getMessage(), e);
-    }
-  }
-
-  public Iterable<IJobConfiguration> getJobs() {
-    return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
-      @Override
-      public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
-        return storeProvider.getJobStore().fetchJobs(MANAGER_KEY);
-      }
-    });
-  }
-
-  public boolean hasJob(IJobKey jobKey) {
-    return fetchJob(jobKey).isPresent();
-  }
-
-  private Optional<IJobConfiguration> fetchJob(final IJobKey jobKey) {
-    checkNotNull(jobKey);
-    return storage.consistentRead(new Work.Quiet<Optional<IJobConfiguration>>() {
-      @Override
-      public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
-        return storeProvider.getJobStore().fetchJob(MANAGER_KEY, jobKey);
-      }
-    });
-  }
-
-  public boolean deleteJob(final IJobKey jobKey) {
-    Optional<IJobConfiguration> job = fetchJob(jobKey);
-    if (!job.isPresent()) {
-      return false;
-    }
-
-    String scheduledJobKey = scheduledJobs.remove(jobKey);
-    if (scheduledJobKey != null) {
-      cron.deschedule(scheduledJobKey);
-      storage.write(new MutateWork.NoResult.Quiet() {
-        @Override
-        protected void execute(Storage.MutableStoreProvider storeProvider) {
-          storeProvider.getJobStore().removeJob(jobKey);
-        }
-      });
-      LOG.info("Successfully deleted cron job " + jobKey);
-    }
-    return true;
-  }
-
-  private final Function<String, String> keyToSchedule = new Function<String, String>() {
-    @Override
-    public String apply(String key) {
-      return cron.getSchedule(key).or("Not found.");
-    }
-  };
-
-  public Map<IJobKey, String> getScheduledJobs() {
-    synchronized (scheduledJobs) {
-      return ImmutableMap.copyOf(Maps.transformValues(scheduledJobs, keyToSchedule));
-    }
-  }
-
-  public Set<IJobKey> getPendingRuns() {
-    synchronized (pendingRuns) {
-      return ImmutableSet.copyOf(pendingRuns.keySet());
-    }
-  }
-
-  /**
-   * Used by functions that expect field validation before being called.
-   */
-  private static class SanitizedCronJob {
-    private final SanitizedConfiguration config;
-
-    SanitizedCronJob(IJobConfiguration unsanitized, CronScheduler cron)
-        throws ScheduleException, TaskDescriptionException {
-
-      this(SanitizedConfiguration.fromUnsanitized(unsanitized), cron);
-    }
-
-    SanitizedCronJob(SanitizedConfiguration config, CronScheduler cron) throws ScheduleException {
-      final IJobConfiguration job = config.getJobConfig();
-      if (!hasCronSchedule(job)) {
-        throw new ScheduleException(
-            String.format("Not a valid cronjob, %s has no cron schedule", JobKeys.toPath(job)));
-      }
-
-      if (!cron.isValidSchedule(job.getCronSchedule())) {
-        throw new ScheduleException("Invalid cron schedule: " + job.getCronSchedule());
-      }
-
-      this.config = config;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index 5696485..e3b5b04 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -125,7 +125,7 @@ class LockManagerImpl implements LockManager {
   private static String formatLockKey(ILockKey lockKey) {
     switch (lockKey.getSetField()) {
       case JOB:
-        return JobKeys.toPath(lockKey.getJob());
+        return JobKeys.canonicalString(lockKey.getJob());
       default:
         return "Unknown lock key type: " + lockKey.getSetField();
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index f330599..d377974 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -40,6 +40,9 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -49,6 +52,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.commons.lang.StringUtils;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -71,7 +75,7 @@ class SchedulerCoreImpl implements SchedulerCore {
 
   // TODO(wfarner): Consider changing this class to not be concerned with cron jobs, requiring the
   // caller to deal with the fork.
-  private final CronJobManager cronScheduler;
+  private final CronJobManager cronJobManager;
 
   // State manager handles persistence of task modifications and state transitions.
   private final StateManager stateManager;
@@ -83,7 +87,7 @@ class SchedulerCoreImpl implements SchedulerCore {
    * Creates a new core scheduler.
    *
    * @param storage Backing store implementation.
-   * @param cronScheduler Cron scheduler.
+   * @param cronJobManager Cron scheduler.
    * @param stateManager Persistent state manager.
    * @param taskIdGenerator Task ID generator.
    * @param quotaManager Quota manager.
@@ -91,13 +95,13 @@ class SchedulerCoreImpl implements SchedulerCore {
   @Inject
   public SchedulerCoreImpl(
       Storage storage,
-      CronJobManager cronScheduler,
+      CronJobManager cronJobManager,
       StateManager stateManager,
       TaskIdGenerator taskIdGenerator,
       QuotaManager quotaManager) {
 
     this.storage = checkNotNull(storage);
-    this.cronScheduler = cronScheduler;
+    this.cronJobManager = cronJobManager;
     this.stateManager = checkNotNull(stateManager);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
     this.quotaManager = checkNotNull(quotaManager);
@@ -108,7 +112,20 @@ class SchedulerCoreImpl implements SchedulerCore {
         storage,
         Query.jobScoped(job.getKey()).active()).isEmpty();
 
-    return hasActiveTasks || cronScheduler.hasJob(job.getKey());
+    return hasActiveTasks || cronJobManager.hasJob(job.getKey());
+  }
+
+  private static boolean isCron(SanitizedConfiguration config) {
+    if (!config.getJobConfig().isSetCronSchedule()) {
+      return false;
+    } else if (StringUtils.isEmpty(config.getJobConfig().getCronSchedule())) {
+      // TODO(ksweeney): Remove this in 0.7.0 (AURORA-423).
+      LOG.warning("Got service config with empty string cron schedule. aurora-0.7.x "
+          + "will interpret this as cron job and cause an error.");
+      return false;
+    } else {
+      return true;
+    }
   }
 
   @Override
@@ -120,12 +137,19 @@ class SchedulerCoreImpl implements SchedulerCore {
       protected void execute(MutableStoreProvider storeProvider) throws ScheduleException {
         final IJobConfiguration job = sanitizedConfiguration.getJobConfig();
         if (hasActiveJob(job)) {
-          throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
+          throw new ScheduleException(
+              "Job already exists: " + JobKeys.canonicalString(job.getKey()));
         }
 
         validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
 
-        if (!cronScheduler.receiveJob(sanitizedConfiguration)) {
+        if (isCron(sanitizedConfiguration)) {
+          try {
+            cronJobManager.createJob(SanitizedCronJob.from(sanitizedConfiguration));
+          } catch (CronException e) {
+            throw new ScheduleException(e);
+          }
+        } else {
           LOG.info("Launching " + sanitizedConfiguration.getTaskConfigs().size() + " tasks.");
           stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
         }
@@ -216,7 +240,7 @@ class SchedulerCoreImpl implements SchedulerCore {
       // it.
       // TODO(maxim): Should be trivial to support killing multiple jobs instead.
       IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
-      cronScheduler.deleteJob(jobKey);
+      cronJobManager.deleteJob(jobKey);
     }
 
     // Unless statuses were specifically supplied, only attempt to kill active tasks.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 7d26082..9db2a1a 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -49,17 +49,10 @@ public class StateModule extends AbstractModule {
     bind(LockManager.class).to(LockManagerImpl.class);
     bind(LockManagerImpl.class).in(Singleton.class);
 
-    bindCronJobManager(binder());
     bindMaintenanceController(binder());
   }
 
   @VisibleForTesting
-  static void bindCronJobManager(Binder binder) {
-    binder.bind(CronJobManager.class).in(Singleton.class);
-    PubsubEventModule.bindSubscriber(binder, CronJobManager.class);
-  }
-
-  @VisibleForTesting
   static void bindMaintenanceController(Binder binder) {
     binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class);
     binder.bind(MaintenanceControllerImpl.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 9bb5c25..f101143 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -94,11 +94,14 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
-import org.apache.aurora.scheduler.state.CronJobManager;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -242,7 +245,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       schedulerCore.createJob(sanitized);
       response.setResponseCode(OK)
           .setMessage(String.format("%d new tasks pending for job %s",
-              sanitized.getJobConfig().getInstanceCount(), JobKeys.toPath(job)));
+              sanitized.getJobConfig().getInstanceCount(), JobKeys.canonicalString(job.getKey())));
     } catch (LockException e) {
       response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
     } catch (TaskDescriptionException | ScheduleException e) {
@@ -275,11 +278,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           ILockKey.build(LockKey.job(jobKey.newBuilder())),
           Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
 
-      cronJobManager.updateJob(SanitizedConfiguration.fromUnsanitized(job));
+      cronJobManager.updateJob(SanitizedCronJob.fromUnsanitized(job));
       return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey);
     } catch (LockException e) {
       return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
-    } catch (TaskDescriptionException | ScheduleException e) {
+    } catch (CronException | TaskDescriptionException e) {
       return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
     }
   }
@@ -314,21 +317,16 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     try {
       sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
     } catch (AuthFailedException e) {
-      response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
-      return response;
+      return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
     }
 
     try {
       cronJobManager.startJobNow(jobKey);
-      response.setResponseCode(OK).setMessage("Cron run started.");
-    } catch (ScheduleException e) {
-      response.setResponseCode(INVALID_REQUEST)
+      return response.setResponseCode(OK).setMessage("Cron run started.");
+    } catch (CronException e) {
+      return response.setResponseCode(INVALID_REQUEST)
           .setMessage("Failed to start cron job - " + e.getMessage());
-    } catch (TaskDescriptionException e) {
-      response.setResponseCode(ERROR).setMessage("Invalid task description: " + e.getMessage());
     }
-
-    return response;
   }
 
   // TODO(William Farner): Provide status information about cron jobs here.
@@ -386,13 +384,14 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       @Override
       public JobSummary apply(IJobKey jobKey) {
         IJobConfiguration job = jobs.get(jobKey);
-        JobSummary smry = new JobSummary()
+        JobSummary summary = new JobSummary()
             .setJob(job.newBuilder())
             .setStats(Jobs.getJobStats(tasks.get(jobKey)).newBuilder());
 
         return Strings.isNullOrEmpty(job.getCronSchedule())
-            ? smry
-            : smry.setNextCronRunMs(cronPredictor.predictNextRun(job.getCronSchedule()).getTime());
+            ? summary
+            : summary.setNextCronRunMs(
+                cronPredictor.predictNextRun(CrontabEntry.parse(job.getCronSchedule())).getTime());
       }
     };
 
@@ -826,7 +825,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               jobsByKey(jobStore, existingJob.getKey());
           switch (matches.size()) {
             case 0:
-              error = Optional.of("No jobs found for key " + JobKeys.toPath(existingJob));
+              error = Optional.of(
+                  "No jobs found for key " + JobKeys.canonicalString(existingJob.getKey()));
               break;
 
             case 1:
@@ -834,14 +834,16 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
                   Iterables.getOnlyElement(matches.entries());
               IJobConfiguration storedJob = match.getValue();
               if (!storedJob.equals(existingJob)) {
-                error = Optional.of("CAS compare failed for " + JobKeys.toPath(storedJob));
+                error = Optional.of(
+                    "CAS compare failed for " + JobKeys.canonicalString(storedJob.getKey()));
               } else {
                 jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
               }
               break;
 
             default:
-              error = Optional.of("Multiple jobs found for key " + JobKeys.toPath(existingJob));
+              error = Optional.of("Multiple jobs found for key "
+                  + JobKeys.canonicalString(existingJob.getKey()));
           }
         }
         break;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/python/apache/aurora/config/thrift.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/thrift.py b/src/main/python/apache/aurora/config/thrift.py
index 1798c40..0cd9246 100644
--- a/src/main/python/apache/aurora/config/thrift.py
+++ b/src/main/python/apache/aurora/config/thrift.py
@@ -250,9 +250,6 @@ def convert(job, metadata=frozenset(), ports=frozenset()):
   if unbound:
     raise InvalidConfig('Config contains unbound variables: %s' % ' '.join(map(str, unbound)))
 
-  cron_schedule = not_empty_or(job.cron_schedule(), '')
-  cron_policy = select_cron_policy(job.cron_policy(), job.cron_collision_policy())
-
   task.executorConfig = ExecutorConfig(
       name=AURORA_EXECUTOR_NAME,
       data=filter_aliased_fields(underlying).json_dumps())
@@ -260,7 +257,7 @@ def convert(job, metadata=frozenset(), ports=frozenset()):
   return JobConfiguration(
       key=key,
       owner=owner,
-      cronSchedule=cron_schedule,
-      cronCollisionPolicy=cron_policy,
+      cronSchedule=not_empty_or(job.cron_schedule(), None),
+      cronCollisionPolicy=select_cron_policy(job.cron_policy(), job.cron_collision_policy()),
       taskConfig=task,
       instanceCount=fully_interpolated(job.instances()))