You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/05/15 04:34:44 UTC
[4/5] CronScheduler based on Quartz
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
index 9831012..6d76f60 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
@@ -39,8 +39,8 @@ import com.twitter.common.net.pool.DynamicHostSet;
import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
import com.twitter.thrift.ServiceInstance;
+import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.CronJobManager;
import org.apache.aurora.scheduler.state.SchedulerCore;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
index efea75f..823668f 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
@@ -25,7 +25,6 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.twitter.common.base.Closure;
import com.twitter.common.thrift.Util;
@@ -34,7 +33,7 @@ import org.antlr.stringtemplate.StringTemplate;
import org.apache.aurora.gen.JobConfiguration;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.state.CronJobManager;
+import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work;
@@ -43,6 +42,8 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.thrift.TBase;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* Servlet that prints out the raw configuration for a specified struct.
*/
@@ -50,11 +51,13 @@ import org.apache.thrift.TBase;
public class StructDump extends JerseyTemplateServlet {
private final Storage storage;
+ private final CronJobManager cronJobManager;
@Inject
- public StructDump(Storage storage) {
+ public StructDump(Storage storage, CronJobManager cronJobManager) {
super("structdump");
- this.storage = Preconditions.checkNotNull(storage);
+ this.storage = checkNotNull(storage);
+ this.cronJobManager = checkNotNull(cronJobManager);
}
private static final String USAGE =
@@ -106,11 +109,11 @@ public class StructDump extends JerseyTemplateServlet {
@PathParam("job") final String job) {
final IJobKey jobKey = JobKeys.from(role, environment, job);
- return dumpEntity("Cron job " + JobKeys.toPath(jobKey),
+ return dumpEntity("Cron job " + JobKeys.canonicalString(jobKey),
new Work.Quiet<Optional<? extends TBase<?, ?>>>() {
@Override
public Optional<JobConfiguration> apply(StoreProvider storeProvider) {
- return storeProvider.getJobStore().fetchJob(CronJobManager.MANAGER_KEY, jobKey)
+ return storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(), jobKey)
.transform(IJobConfiguration.TO_BUILDER);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
index ff7e3cb..46be612 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
@@ -119,7 +119,7 @@ interface SlaGroup {
return Multimaps.index(tasks, Functions.compose(new Function<IJobKey, String>() {
@Override
public String apply(IJobKey jobKey) {
- return "sla_" + JobKeys.toPath(jobKey) + "_";
+ return "sla_" + JobKeys.canonicalString(jobKey) + "_";
}
}, Tasks.SCHEDULED_TO_JOB_KEY));
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
deleted file mode 100644
index 4bd190c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Collections;
-import java.util.Date;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Supplier;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.BackoffHelper;
-
-import org.apache.aurora.gen.CronCollisionPolicy;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ScheduleException;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
-import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
-import org.apache.aurora.scheduler.cron.CronException;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.commons.lang.StringUtils;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-
-/**
- * A job scheduler that receives jobs that should be run periodically on a cron schedule.
- */
-public class CronJobManager implements EventSubscriber {
-
- public static final String MANAGER_KEY = "CRON";
-
- @VisibleForTesting
- static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cron");
-
- private static final Logger LOG = Logger.getLogger(CronJobManager.class.getName());
-
- @CmdLine(name = "cron_start_initial_backoff", help =
- "Initial backoff delay while waiting for a previous cron run to start.")
- private static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
- Arg.create(Amount.of(1L, Time.SECONDS));
-
- @CmdLine(name = "cron_start_max_backoff", help =
- "Max backoff delay while waiting for a previous cron run to start.")
- private static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
- Arg.create(Amount.of(1L, Time.MINUTES));
-
- private final AtomicLong cronJobsTriggered = Stats.exportLong("cron_jobs_triggered");
- private final AtomicLong cronJobLaunchFailures = Stats.exportLong("cron_job_launch_failures");
-
- // Maps from the unique job identifier to the unique identifier used internally by the cron
- // scheduler.
- private final Map<IJobKey, String> scheduledJobs =
- Collections.synchronizedMap(Maps.<IJobKey, String>newHashMap());
-
- // Prevents runs from dogpiling while waiting for a run to transition out of the KILLING state.
- // This is necessary because killing a job (if dictated by cron collision policy) is an
- // asynchronous operation.
- private final Map<IJobKey, SanitizedConfiguration> pendingRuns =
- Collections.synchronizedMap(Maps.<IJobKey, SanitizedConfiguration>newHashMap());
-
- private final StateManager stateManager;
- private final Storage storage;
- private final CronScheduler cron;
- private final ShutdownRegistry shutdownRegistry;
- private final BackoffHelper delayedStartBackoff;
- private final Executor delayedRunExecutor;
-
- @Inject
- CronJobManager(
- StateManager stateManager,
- Storage storage,
- CronScheduler cron,
- ShutdownRegistry shutdownRegistry) {
-
- this(
- stateManager,
- storage,
- cron,
- shutdownRegistry,
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CronDelay-%d").build()));
- }
-
- @VisibleForTesting
- CronJobManager(
- StateManager stateManager,
- Storage storage,
- CronScheduler cron,
- ShutdownRegistry shutdownRegistry,
- Executor delayedRunExecutor) {
-
- this.stateManager = checkNotNull(stateManager);
- this.storage = checkNotNull(storage);
- this.cron = checkNotNull(cron);
- this.shutdownRegistry = checkNotNull(shutdownRegistry);
- this.delayedStartBackoff =
- new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get());
- this.delayedRunExecutor = checkNotNull(delayedRunExecutor);
-
- Stats.exportSize("cron_num_pending_runs", pendingRuns);
- }
-
- private void mapScheduledJob(SanitizedCronJob cronJob) throws ScheduleException {
- IJobKey jobKey = cronJob.config.getJobConfig().getKey();
- synchronized (scheduledJobs) {
- Preconditions.checkState(
- !scheduledJobs.containsKey(jobKey),
- "Illegal state - cron schedule already exists for " + JobKeys.toPath(jobKey));
- scheduledJobs.put(jobKey, scheduleJob(cronJob));
- }
- }
-
- /**
- * Notifies the cron job manager that the scheduler is active, and job configurations are ready to
- * load.
- *
- * @param schedulerActive Event.
- */
- @Subscribe
- public void schedulerActive(SchedulerActive schedulerActive) {
- cron.startAsync().awaitRunning();
- shutdownRegistry.addAction(new Command() {
- @Override
- public void execute() {
- // NOTE: We don't know ahead-of-time which thread will execute the shutdown command,
- // so we shouldn't block here.
- cron.stopAsync();
- }
- });
-
- Iterable<IJobConfiguration> crons =
- storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
- @Override
- public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
- return storeProvider.getJobStore().fetchJobs(MANAGER_KEY);
- }
- });
-
- for (IJobConfiguration job : crons) {
- try {
- mapScheduledJob(new SanitizedCronJob(job, cron));
- } catch (ScheduleException | TaskDescriptionException e) {
- logLaunchFailure(job, e);
- }
- }
- }
-
- private void logLaunchFailure(IJobConfiguration job, Exception e) {
- cronJobLaunchFailures.incrementAndGet();
- LOG.log(Level.SEVERE, "Scheduling failed for recovered job " + job, e);
- }
-
- /**
- * Triggers execution of a job.
- *
- * @param jobKey Key of the job to start.
- * @throws ScheduleException If the job could not be started with the cron system.
- * @throws TaskDescriptionException If the stored job associated with {@code jobKey} has field
- * validation problems.
- */
- public void startJobNow(IJobKey jobKey) throws TaskDescriptionException, ScheduleException {
- Optional<IJobConfiguration> jobConfig = fetchJob(jobKey);
- if (!jobConfig.isPresent()) {
- throw new ScheduleException("Cron job does not exist for " + JobKeys.toPath(jobKey));
- }
-
- cronTriggered(new SanitizedCronJob(jobConfig.get(), cron));
- }
-
- private void delayedRun(final Query.Builder query, final SanitizedConfiguration config) {
- IJobConfiguration job = config.getJobConfig();
- final String jobPath = JobKeys.toPath(job);
- final IJobKey jobKey = job.getKey();
- LOG.info("Waiting for job to terminate before launching cron job " + jobPath);
- if (pendingRuns.put(jobKey, config) == null) {
- LOG.info("Launching a task to wait for job to finish: " + jobPath);
- // There was no run already pending for this job, launch a task to delay launch until the
- // existing run has terminated.
- delayedRunExecutor.execute(new Runnable() {
- @Override
- public void run() {
- runWhenTerminated(query, jobKey);
- }
- });
- }
- }
-
- private void runWhenTerminated(final Query.Builder query, final IJobKey jobKey) {
- try {
- delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
- LOG.info("Initiating delayed launch of cron " + jobKey);
- SanitizedConfiguration config = pendingRuns.remove(jobKey);
- checkNotNull(config, "Failed to fetch job for delayed run of " + jobKey);
- LOG.info("Launching " + config.getTaskConfigs().size() + " tasks.");
- stateManager.insertPendingTasks(config.getTaskConfigs());
- return true;
- } else {
- LOG.info("Not yet safe to run cron " + jobKey);
- return false;
- }
- }
- });
- } catch (InterruptedException e) {
- LOG.log(Level.WARNING, "Interrupted while trying to launch cron " + jobKey, e);
- Thread.currentThread().interrupt();
- }
- }
-
- private void killActiveTasks(Set<String> taskIds) {
- if (taskIds.isEmpty()) {
- return;
- }
-
- for (String taskId : taskIds) {
- stateManager.changeState(
- taskId,
- Optional.<ScheduleStatus>absent(),
- KILLING,
- KILL_AUDIT_MESSAGE);
- }
- }
-
- public static CronCollisionPolicy orDefault(@Nullable CronCollisionPolicy policy) {
- return Optional.fromNullable(policy).or(CronCollisionPolicy.KILL_EXISTING);
- }
-
- /**
- * Triggers execution of a cron job, depending on the cron collision policy for the job.
- *
- * @param cronJob The job to be triggered.
- */
- private void cronTriggered(SanitizedCronJob cronJob) {
- final SanitizedConfiguration config = cronJob.config;
- final IJobConfiguration job = config.getJobConfig();
- LOG.info(String.format("Cron triggered for %s at %s with policy %s",
- JobKeys.toPath(job), new Date(), job.getCronCollisionPolicy()));
- cronJobsTriggered.incrementAndGet();
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
- ImmutableMap.Builder<Integer, ITaskConfig> builder = ImmutableMap.builder();
- Query.Builder activeQuery = Query.jobScoped(job.getKey()).active();
- Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
-
- if (activeTasks.isEmpty()) {
- builder.putAll(config.getTaskConfigs());
- } else {
- switch (orDefault(job.getCronCollisionPolicy())) {
- case KILL_EXISTING:
- killActiveTasks(activeTasks);
- delayedRun(activeQuery, config);
- break;
-
- case CANCEL_NEW:
- break;
-
- case RUN_OVERLAP:
- LOG.severe(String.format(
- "Ignoring trigger for job %s with deprecated collision"
- + "policy RUN_OVERLAP due to unterminated active tasks.",
- JobKeys.toPath(job)));
- break;
-
- default:
- LOG.severe("Unrecognized cron collision policy: " + job.getCronCollisionPolicy());
- }
- }
-
- Map<Integer, ITaskConfig> newTasks = builder.build();
- if (!newTasks.isEmpty()) {
- stateManager.insertPendingTasks(newTasks);
- }
- }
- });
- }
-
- /**
- * Updates (re-schedules) the existing cron job.
- *
- * @param config New job configuration to update to.
- * @throws ScheduleException If non-cron job confuration provided.
- */
- public void updateJob(SanitizedConfiguration config) throws ScheduleException {
- IJobConfiguration job = config.getJobConfig();
- if (!hasCronSchedule(job)) {
- throw new ScheduleException("A cron job may not be updated to a non-cron job.");
- }
- String key = scheduledJobs.remove(job.getKey());
- if (key == null) {
- throw new ScheduleException(
- "No cron template found for the given key: " + JobKeys.toPath(job));
- }
- cron.deschedule(key);
- checkArgument(receiveJob(config));
- }
-
- private static boolean hasCronSchedule(IJobConfiguration job) {
- checkNotNull(job);
- return !StringUtils.isEmpty(job.getCronSchedule());
- }
-
- public boolean receiveJob(SanitizedConfiguration config) throws ScheduleException {
- final IJobConfiguration job = config.getJobConfig();
- if (!hasCronSchedule(job)) {
- return false;
- }
-
- if (CronCollisionPolicy.RUN_OVERLAP.equals(job.getCronCollisionPolicy())) {
- throw new ScheduleException(
- "The RUN_OVERLAP collision policy has been removed (AURORA-38).");
- }
-
- SanitizedCronJob cronJob = new SanitizedCronJob(config, cron);
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(Storage.MutableStoreProvider storeProvider) {
- storeProvider.getJobStore().saveAcceptedJob(MANAGER_KEY, job);
- }
- });
- mapScheduledJob(cronJob);
-
- return true;
- }
-
- private String scheduleJob(final SanitizedCronJob cronJob) throws ScheduleException {
- IJobConfiguration job = cronJob.config.getJobConfig();
- final String jobPath = JobKeys.toPath(job);
- LOG.info(String.format("Scheduling cron job %s: %s", jobPath, job.getCronSchedule()));
- try {
- return cron.schedule(job.getCronSchedule(), new Runnable() {
- @Override
- public void run() {
- // TODO(William Farner): May want to record information about job runs.
- LOG.info("Running cron job: " + jobPath);
- cronTriggered(cronJob);
- }
- });
- } catch (CronException e) {
- throw new ScheduleException("Failed to schedule cron job: " + e.getMessage(), e);
- }
- }
-
- public Iterable<IJobConfiguration> getJobs() {
- return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
- @Override
- public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
- return storeProvider.getJobStore().fetchJobs(MANAGER_KEY);
- }
- });
- }
-
- public boolean hasJob(IJobKey jobKey) {
- return fetchJob(jobKey).isPresent();
- }
-
- private Optional<IJobConfiguration> fetchJob(final IJobKey jobKey) {
- checkNotNull(jobKey);
- return storage.consistentRead(new Work.Quiet<Optional<IJobConfiguration>>() {
- @Override
- public Optional<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
- return storeProvider.getJobStore().fetchJob(MANAGER_KEY, jobKey);
- }
- });
- }
-
- public boolean deleteJob(final IJobKey jobKey) {
- Optional<IJobConfiguration> job = fetchJob(jobKey);
- if (!job.isPresent()) {
- return false;
- }
-
- String scheduledJobKey = scheduledJobs.remove(jobKey);
- if (scheduledJobKey != null) {
- cron.deschedule(scheduledJobKey);
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(Storage.MutableStoreProvider storeProvider) {
- storeProvider.getJobStore().removeJob(jobKey);
- }
- });
- LOG.info("Successfully deleted cron job " + jobKey);
- }
- return true;
- }
-
- private final Function<String, String> keyToSchedule = new Function<String, String>() {
- @Override
- public String apply(String key) {
- return cron.getSchedule(key).or("Not found.");
- }
- };
-
- public Map<IJobKey, String> getScheduledJobs() {
- synchronized (scheduledJobs) {
- return ImmutableMap.copyOf(Maps.transformValues(scheduledJobs, keyToSchedule));
- }
- }
-
- public Set<IJobKey> getPendingRuns() {
- synchronized (pendingRuns) {
- return ImmutableSet.copyOf(pendingRuns.keySet());
- }
- }
-
- /**
- * Used by functions that expect field validation before being called.
- */
- private static class SanitizedCronJob {
- private final SanitizedConfiguration config;
-
- SanitizedCronJob(IJobConfiguration unsanitized, CronScheduler cron)
- throws ScheduleException, TaskDescriptionException {
-
- this(SanitizedConfiguration.fromUnsanitized(unsanitized), cron);
- }
-
- SanitizedCronJob(SanitizedConfiguration config, CronScheduler cron) throws ScheduleException {
- final IJobConfiguration job = config.getJobConfig();
- if (!hasCronSchedule(job)) {
- throw new ScheduleException(
- String.format("Not a valid cronjob, %s has no cron schedule", JobKeys.toPath(job)));
- }
-
- if (!cron.isValidSchedule(job.getCronSchedule())) {
- throw new ScheduleException("Invalid cron schedule: " + job.getCronSchedule());
- }
-
- this.config = config;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index 5696485..e3b5b04 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -125,7 +125,7 @@ class LockManagerImpl implements LockManager {
private static String formatLockKey(ILockKey lockKey) {
switch (lockKey.getSetField()) {
case JOB:
- return JobKeys.toPath(lockKey.getJob());
+ return JobKeys.canonicalString(lockKey.getJob());
default:
return "Unknown lock key type: " + lockKey.getSetField();
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index f330599..d377974 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -40,6 +40,9 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.ScheduleException;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
import org.apache.aurora.scheduler.quota.QuotaCheckResult;
import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.storage.Storage;
@@ -49,6 +52,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.commons.lang.StringUtils;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -71,7 +75,7 @@ class SchedulerCoreImpl implements SchedulerCore {
// TODO(wfarner): Consider changing this class to not be concerned with cron jobs, requiring the
// caller to deal with the fork.
- private final CronJobManager cronScheduler;
+ private final CronJobManager cronJobManager;
// State manager handles persistence of task modifications and state transitions.
private final StateManager stateManager;
@@ -83,7 +87,7 @@ class SchedulerCoreImpl implements SchedulerCore {
* Creates a new core scheduler.
*
* @param storage Backing store implementation.
- * @param cronScheduler Cron scheduler.
+ * @param cronJobManager Cron scheduler.
* @param stateManager Persistent state manager.
* @param taskIdGenerator Task ID generator.
* @param quotaManager Quota manager.
@@ -91,13 +95,13 @@ class SchedulerCoreImpl implements SchedulerCore {
@Inject
public SchedulerCoreImpl(
Storage storage,
- CronJobManager cronScheduler,
+ CronJobManager cronJobManager,
StateManager stateManager,
TaskIdGenerator taskIdGenerator,
QuotaManager quotaManager) {
this.storage = checkNotNull(storage);
- this.cronScheduler = cronScheduler;
+ this.cronJobManager = cronJobManager;
this.stateManager = checkNotNull(stateManager);
this.taskIdGenerator = checkNotNull(taskIdGenerator);
this.quotaManager = checkNotNull(quotaManager);
@@ -108,7 +112,20 @@ class SchedulerCoreImpl implements SchedulerCore {
storage,
Query.jobScoped(job.getKey()).active()).isEmpty();
- return hasActiveTasks || cronScheduler.hasJob(job.getKey());
+ return hasActiveTasks || cronJobManager.hasJob(job.getKey());
+ }
+
+ private static boolean isCron(SanitizedConfiguration config) {
+ if (!config.getJobConfig().isSetCronSchedule()) {
+ return false;
+ } else if (StringUtils.isEmpty(config.getJobConfig().getCronSchedule())) {
+ // TODO(ksweeney): Remove this in 0.7.0 (AURORA-423).
+ LOG.warning("Got service config with empty string cron schedule. aurora-0.7.x "
+ + "will interpret this as cron job and cause an error.");
+ return false;
+ } else {
+ return true;
+ }
}
@Override
@@ -120,12 +137,19 @@ class SchedulerCoreImpl implements SchedulerCore {
protected void execute(MutableStoreProvider storeProvider) throws ScheduleException {
final IJobConfiguration job = sanitizedConfiguration.getJobConfig();
if (hasActiveJob(job)) {
- throw new ScheduleException("Job already exists: " + JobKeys.toPath(job));
+ throw new ScheduleException(
+ "Job already exists: " + JobKeys.canonicalString(job.getKey()));
}
validateTaskLimits(job.getTaskConfig(), job.getInstanceCount());
- if (!cronScheduler.receiveJob(sanitizedConfiguration)) {
+ if (isCron(sanitizedConfiguration)) {
+ try {
+ cronJobManager.createJob(SanitizedCronJob.from(sanitizedConfiguration));
+ } catch (CronException e) {
+ throw new ScheduleException(e);
+ }
+ } else {
LOG.info("Launching " + sanitizedConfiguration.getTaskConfigs().size() + " tasks.");
stateManager.insertPendingTasks(sanitizedConfiguration.getTaskConfigs());
}
@@ -216,7 +240,7 @@ class SchedulerCoreImpl implements SchedulerCore {
// it.
// TODO(maxim): Should be trivial to support killing multiple jobs instead.
IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
- cronScheduler.deleteJob(jobKey);
+ cronJobManager.deleteJob(jobKey);
}
// Unless statuses were specifically supplied, only attempt to kill active tasks.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 7d26082..9db2a1a 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -49,17 +49,10 @@ public class StateModule extends AbstractModule {
bind(LockManager.class).to(LockManagerImpl.class);
bind(LockManagerImpl.class).in(Singleton.class);
- bindCronJobManager(binder());
bindMaintenanceController(binder());
}
@VisibleForTesting
- static void bindCronJobManager(Binder binder) {
- binder.bind(CronJobManager.class).in(Singleton.class);
- PubsubEventModule.bindSubscriber(binder, CronJobManager.class);
- }
-
- @VisibleForTesting
static void bindMaintenanceController(Binder binder) {
binder.bind(MaintenanceController.class).to(MaintenanceControllerImpl.class);
binder.bind(MaintenanceControllerImpl.class).in(Singleton.class);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 9bb5c25..f101143 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -94,11 +94,14 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
import org.apache.aurora.scheduler.quota.QuotaInfo;
import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
-import org.apache.aurora.scheduler.state.CronJobManager;
import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.state.LockManager.LockException;
import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -242,7 +245,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
schedulerCore.createJob(sanitized);
response.setResponseCode(OK)
.setMessage(String.format("%d new tasks pending for job %s",
- sanitized.getJobConfig().getInstanceCount(), JobKeys.toPath(job)));
+ sanitized.getJobConfig().getInstanceCount(), JobKeys.canonicalString(job.getKey())));
} catch (LockException e) {
response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
} catch (TaskDescriptionException | ScheduleException e) {
@@ -275,11 +278,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
ILockKey.build(LockKey.job(jobKey.newBuilder())),
Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
- cronJobManager.updateJob(SanitizedConfiguration.fromUnsanitized(job));
+ cronJobManager.updateJob(SanitizedCronJob.fromUnsanitized(job));
return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey);
} catch (LockException e) {
return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage());
- } catch (TaskDescriptionException | ScheduleException e) {
+ } catch (CronException | TaskDescriptionException e) {
return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage());
}
}
@@ -314,21 +317,16 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
try {
sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
} catch (AuthFailedException e) {
- response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
- return response;
+ return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage());
}
try {
cronJobManager.startJobNow(jobKey);
- response.setResponseCode(OK).setMessage("Cron run started.");
- } catch (ScheduleException e) {
- response.setResponseCode(INVALID_REQUEST)
+ return response.setResponseCode(OK).setMessage("Cron run started.");
+ } catch (CronException e) {
+ return response.setResponseCode(INVALID_REQUEST)
.setMessage("Failed to start cron job - " + e.getMessage());
- } catch (TaskDescriptionException e) {
- response.setResponseCode(ERROR).setMessage("Invalid task description: " + e.getMessage());
}
-
- return response;
}
// TODO(William Farner): Provide status information about cron jobs here.
@@ -386,13 +384,14 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
@Override
public JobSummary apply(IJobKey jobKey) {
IJobConfiguration job = jobs.get(jobKey);
- JobSummary smry = new JobSummary()
+ JobSummary summary = new JobSummary()
.setJob(job.newBuilder())
.setStats(Jobs.getJobStats(tasks.get(jobKey)).newBuilder());
return Strings.isNullOrEmpty(job.getCronSchedule())
- ? smry
- : smry.setNextCronRunMs(cronPredictor.predictNextRun(job.getCronSchedule()).getTime());
+ ? summary
+ : summary.setNextCronRunMs(
+ cronPredictor.predictNextRun(CrontabEntry.parse(job.getCronSchedule())).getTime());
}
};
@@ -826,7 +825,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
jobsByKey(jobStore, existingJob.getKey());
switch (matches.size()) {
case 0:
- error = Optional.of("No jobs found for key " + JobKeys.toPath(existingJob));
+ error = Optional.of(
+ "No jobs found for key " + JobKeys.canonicalString(existingJob.getKey()));
break;
case 1:
@@ -834,14 +834,16 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
Iterables.getOnlyElement(matches.entries());
IJobConfiguration storedJob = match.getValue();
if (!storedJob.equals(existingJob)) {
- error = Optional.of("CAS compare failed for " + JobKeys.toPath(storedJob));
+ error = Optional.of(
+ "CAS compare failed for " + JobKeys.canonicalString(storedJob.getKey()));
} else {
jobStore.saveAcceptedJob(match.getKey(), rewrittenJob);
}
break;
default:
- error = Optional.of("Multiple jobs found for key " + JobKeys.toPath(existingJob));
+ error = Optional.of("Multiple jobs found for key "
+ + JobKeys.canonicalString(existingJob.getKey()));
}
}
break;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3361dbed/src/main/python/apache/aurora/config/thrift.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/thrift.py b/src/main/python/apache/aurora/config/thrift.py
index 1798c40..0cd9246 100644
--- a/src/main/python/apache/aurora/config/thrift.py
+++ b/src/main/python/apache/aurora/config/thrift.py
@@ -250,9 +250,6 @@ def convert(job, metadata=frozenset(), ports=frozenset()):
if unbound:
raise InvalidConfig('Config contains unbound variables: %s' % ' '.join(map(str, unbound)))
- cron_schedule = not_empty_or(job.cron_schedule(), '')
- cron_policy = select_cron_policy(job.cron_policy(), job.cron_collision_policy())
-
task.executorConfig = ExecutorConfig(
name=AURORA_EXECUTOR_NAME,
data=filter_aliased_fields(underlying).json_dumps())
@@ -260,7 +257,7 @@ def convert(job, metadata=frozenset(), ports=frozenset()):
return JobConfiguration(
key=key,
owner=owner,
- cronSchedule=cron_schedule,
- cronCollisionPolicy=cron_policy,
+ cronSchedule=not_empty_or(job.cron_schedule(), None),
+ cronCollisionPolicy=select_cron_policy(job.cron_policy(), job.cron_collision_policy()),
taskConfig=task,
instanceCount=fully_interpolated(job.instances()))