You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/04/23 20:45:37 UTC
[5/5] git commit: AURORA-132: Cron system based on Quartz
AURORA-132: Cron system based on Quartz
This introduces a new cron system based on Quartz and removes the
NoopCronModule.
Testing Done:
./gradlew build
Bugs closed: AURORA-132
Reviewed at https://reviews.apache.org/r/19767/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/c285f2f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/c285f2f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/c285f2f8
Branch: refs/heads/master
Commit: c285f2f83dc769143bc4a3be67e3d53b8fc51418
Parents: 760e5d3
Author: Kevin Sweeney <ke...@apache.org>
Authored: Wed Apr 23 11:45:15 2014 -0700
Committer: Kevin Sweeney <ke...@apache.org>
Committed: Wed Apr 23 11:45:15 2014 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../aurora/scheduler/MesosTaskFactory.java | 2 +-
.../aurora/scheduler/app/SchedulerMain.java | 20 +-
.../aurora/scheduler/async/TaskGroups.java | 2 +-
.../apache/aurora/scheduler/base/JobKeys.java | 40 +-
.../configuration/ConfigurationManager.java | 12 +-
.../aurora/scheduler/cron/CronJobManager.java | 97 +
.../aurora/scheduler/cron/CronPredictor.java | 2 +-
.../aurora/scheduler/cron/CronScheduler.java | 40 +-
.../aurora/scheduler/cron/CrontabEntryTest.java | 163 -
.../aurora/scheduler/cron/SanitizedCronJob.java | 131 +
.../scheduler/cron/noop/NoopCronModule.java | 40 -
.../scheduler/cron/noop/NoopCronPredictor.java | 33 -
.../scheduler/cron/noop/NoopCronScheduler.java | 83 -
.../scheduler/cron/quartz/AuroraCronJob.java | 231 ++
.../cron/quartz/AuroraCronJobFactory.java | 49 +
.../cron/quartz/CronJobManagerImpl.java | 256 ++
.../scheduler/cron/quartz/CronLifecycle.java | 114 +
.../scheduler/cron/quartz/CronModule.java | 130 +
.../cron/quartz/CronPredictorImpl.java | 46 +
.../cron/quartz/CronSchedulerImpl.java | 71 +
.../aurora/scheduler/cron/quartz/Quartz.java | 124 +
.../scheduler/cron/testing/AbstractCronIT.java | 135 -
.../org/apache/aurora/scheduler/http/Cron.java | 3 +-
.../aurora/scheduler/http/ServletModule.java | 2 +-
.../aurora/scheduler/http/StructDump.java | 15 +-
.../aurora/scheduler/state/CronJobManager.java | 484 ---
.../aurora/scheduler/state/LockManagerImpl.java | 2 +-
.../scheduler/state/SchedulerCoreImpl.java | 30 +-
.../aurora/scheduler/state/StateModule.java | 7 -
.../thrift/SchedulerThriftInterface.java | 40 +-
.../cron/testing/cron-schedule-predictions.json | 3332 ------------------
.../aurora/scheduler/cron/CrontabEntryTest.java | 168 +
.../scheduler/cron/ExpectedPrediction.java | 55 +
.../aurora/scheduler/cron/noop/NoopCronIT.java | 63 -
.../cron/quartz/AuroraCronJobTest.java | 174 +
.../aurora/scheduler/cron/quartz/CronIT.java | 257 ++
.../cron/quartz/CronJobManagerImplTest.java | 221 ++
.../cron/quartz/CronPredictorImplTest.java | 89 +
.../scheduler/cron/quartz/QuartzTestUtil.java | 78 +
.../state/BaseSchedulerCoreImplTest.java | 344 +-
.../scheduler/state/CronJobManagerTest.java | 490 ---
.../scheduler/state/LockManagerImplTest.java | 2 +-
.../thrift/SchedulerThriftInterfaceTest.java | 17 +-
.../aurora/scheduler/thrift/ThriftIT.java | 4 +-
.../scheduler/cron/expected-predictions.json | 3332 ++++++++++++++++++
46 files changed, 5808 insertions(+), 5223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 459cd85..831d8b3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -163,6 +163,7 @@ dependencies {
compile 'org.apache.mesos:mesos:0.17.0'
compile thriftLib
compile 'org.apache.zookeeper:zookeeper:3.3.4'
+ compile 'org.quartz-scheduler:quartz:2.2.1'
compile "org.slf4j:slf4j-api:${slf4jRev}"
compile "org.slf4j:slf4j-jdk14:${slf4jRev}"
compile 'com.twitter.common.logging:log4j:0.0.7'
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
index 86bbc29..bdd8c19 100644
--- a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
@@ -135,7 +135,7 @@ public interface MesosTaskFactory {
}
TaskInfo.Builder taskBuilder =
TaskInfo.newBuilder()
- .setName(JobKeys.toPath(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
+ .setName(JobKeys.canonicalString(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
.setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
.setSlaveId(slaveId)
.addAllResources(resources)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index bf3d7a3..da6f5e5 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -59,9 +59,7 @@ import org.apache.aurora.scheduler.DriverFactory;
import org.apache.aurora.scheduler.DriverFactory.DriverFactoryImpl;
import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
import org.apache.aurora.scheduler.SchedulerLifecycle;
-import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.apache.aurora.scheduler.cron.noop.NoopCronModule;
+import org.apache.aurora.scheduler.cron.quartz.CronModule;
import org.apache.aurora.scheduler.local.IsolatedSchedulerModule;
import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
@@ -115,17 +113,7 @@ public class SchedulerMain extends AbstractApplication {
.add(CapabilityValidator.class)
.build();
- @CmdLine(name = "cron_module",
- help = "A Guice module to provide cron bindings. NOTE: The default is a no-op.")
- private static final Arg<? extends Class<? extends Module>> CRON_MODULE =
- Arg.create(NoopCronModule.class);
-
- private static final Iterable<Class<?>> CRON_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
- .add(CronPredictor.class)
- .add(CronScheduler.class)
- .build();
-
- // TODO(Suman Karumuri): Pass in AUTH and CRON modules as extra modules
+ // TODO(Suman Karumuri): Pass in AUTH as extra module
@CmdLine(name = "extra_modules",
help = "A list of modules that provide additional functionality.")
private static final Arg<List<Class<? extends Module>>> EXTRA_MODULES =
@@ -151,8 +139,7 @@ public class SchedulerMain extends AbstractApplication {
private static Iterable<? extends Module> getExtraModules() {
Builder<Module> modules = ImmutableList.builder();
- modules.add(Modules.wrapInPrivateModule(AUTH_MODULE.get(), AUTH_MODULE_CLASSES))
- .add(Modules.wrapInPrivateModule(CRON_MODULE.get(), CRON_MODULE_CLASSES));
+ modules.add(Modules.wrapInPrivateModule(AUTH_MODULE.get(), AUTH_MODULE_CLASSES));
for (Class<? extends Module> moduleClass : EXTRA_MODULES.get()) {
modules.add(Modules.getModule(moduleClass));
@@ -173,6 +160,7 @@ public class SchedulerMain extends AbstractApplication {
.addAll(getExtraModules())
.add(new LogStorageModule())
.add(new MemStorageModule(Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class)))
+ .add(new CronModule())
.add(new ThriftModule())
.add(new ThriftAuthModule())
.build();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index 6aff091..ef73032 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -243,7 +243,7 @@ public class TaskGroups implements EventSubscriber {
@Override
public String toString() {
- return JobKeys.toPath(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
+ return JobKeys.canonicalString(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
index db1bec4..c81ac62 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/JobKeys.java
@@ -15,17 +15,20 @@
*/
package org.apache.aurora.scheduler.base;
+import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import com.google.common.base.Function;
import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
-import com.google.common.base.Strings;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.TaskQuery;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -83,9 +86,9 @@ public final class JobKeys {
*/
public static boolean isValid(@Nullable IJobKey jobKey) {
return jobKey != null
- && !Strings.isNullOrEmpty(jobKey.getRole())
- && !Strings.isNullOrEmpty(jobKey.getEnvironment())
- && !Strings.isNullOrEmpty(jobKey.getName());
+ && ConfigurationManager.isGoodIdentifier(jobKey.getRole())
+ && ConfigurationManager.isGoodIdentifier(jobKey.getEnvironment())
+ && ConfigurationManager.isGoodIdentifier(jobKey.getName());
}
/**
@@ -132,25 +135,32 @@ public final class JobKeys {
}
/**
- * Create a "/"-delimited String representation of a job key, suitable for logging but not
- * necessarily suitable for use as a unique identifier.
+ * Create a "/"-delimited representation of job key usable as a unique identifier in this cluster.
*
+ * It is guaranteed that {@code k.equals(JobKeys.parse(JobKeys.canonicalString(k))}.
+ *
+ * @see #parse(String)
* @param jobKey Key to represent.
- * @return "/"-delimited representation of the key.
+ * @return Canonical "/"-delimited representation of the key.
*/
- public static String toPath(IJobKey jobKey) {
- return jobKey.getRole() + "/" + jobKey.getEnvironment() + "/" + jobKey.getName();
+ public static String canonicalString(IJobKey jobKey) {
+ return Joiner.on("/").join(jobKey.getRole(), jobKey.getEnvironment(), jobKey.getName());
}
/**
- * Create a "/"-delimited String representation of job key, suitable for logging but not
- * necessarily suitable for use as a unique identifier.
+ * Create a job key from a "role/environment/name" representation.
+ *
+ * It is guaranteed that {@code k.equals(JobKeys.parse(JobKeys.canonicalString(k))}.
*
- * @param job Job to represent.
- * @return "/"-delimited representation of the job's key.
+ * @see #canonicalString(IJobKey)
+ * @param string Input to parse.
+ * @return Parsed representation.
+ * @throws IllegalArgumentException when the string fails to parse.
*/
- public static String toPath(IJobConfiguration job) {
- return toPath(job.getKey());
+ public static IJobKey parse(String string) throws IllegalArgumentException {
+ List<String> components = Splitter.on("/").splitToList(string);
+ checkArgument(components.size() == 3);
+ return from(components.get(0), components.get(1), components.get(2));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index 82034e0..e5ad461 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -164,9 +164,15 @@ public final class ConfigurationManager {
// Utility class.
}
- @VisibleForTesting
- static boolean isGoodIdentifier(String identifier) {
- return GOOD_IDENTIFIER.matcher(identifier).matches()
+ /**
+ * Verifies that an identifier is an acceptable name component.
+ *
+ * @param identifier Identifier to check.
+ * @return false if the identifier is null or invalid.
+ */
+ public static boolean isGoodIdentifier(@Nullable String identifier) {
+ return identifier != null
+ && GOOD_IDENTIFIER.matcher(identifier).matches()
&& (identifier.length() <= MAX_IDENTIFIER_LENGTH);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/cron/CronJobManager.java
new file mode 100644
index 0000000..7c8d5ec
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/CronJobManager.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron;
+
+import java.util.Map;
+
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+
+/**
+ * Manages the persistence and scheduling of jobs that should be run periodically on a cron
+ * schedule.
+ */
+public interface CronJobManager {
+ /**
+ * Triggers execution of a job.
+ *
+ * @param jobKey Key of the job to start.
+ * @throws CronException If the job could not be started with the cron system.
+ */
+ void startJobNow(IJobKey jobKey) throws CronException;
+
+ /**
+ * Persist a new cron job to storage and schedule it for future execution.
+ *
+ * @param config Cron job configuration to update to.
+ * @throws CronException If a job with the same key does not exist or the job could not be
+ * scheduled.
+ */
+ void updateJob(SanitizedCronJob config) throws CronException;
+
+ /**
+ * Persist a cron job to storage and schedule it for future execution.
+ *
+ * @param config New cron job configuration.
+ * @throws CronException If a job with the same key exists or the job could not be scheduled.
+ */
+ void createJob(SanitizedCronJob config) throws CronException;
+
+ /**
+ * Get all cron jobs.
+ *
+ * TODO(ksweeney): Consider deprecating this and letting caller query storage directly.
+ *
+ * @return An immutable snapshot of cron jobs at some instant.
+ */
+ Iterable<IJobConfiguration> getJobs();
+
+ /**
+ * Test whether a job exists.
+ *
+ * TODO(ksweeney): Consider deprecating this and letting caller query storage directly.
+ *
+ * @param jobKey Key of the job to check.
+ * @return false when a job does not exist in storage.
+ */
+ boolean hasJob(IJobKey jobKey);
+
+ /**
+ * Remove a job and deschedule it.
+ *
+ * @param jobKey Key of the job to delete.
+ * @return true if a job was removed.
+ */
+ boolean deleteJob(IJobKey jobKey);
+
+ /**
+ * A list of the currently scheduled jobs and when they will run according to the underlying
+ * execution engine.
+ *
+ * @return A map from job to the cron schedule in use for that job.
+ */
+ Map<IJobKey, CrontabEntry> getScheduledJobs();
+
+ /**
+ * The unique ID of this cron job manager, used as a prefix in the JobStore.
+ *
+ * TODO(ksweeney): Consider removing this from storage entirely since the JobManager abstraction
+ * is gone.
+ *
+ * @return The unique ID of the manager.
+ */
+ String getManagerKey();
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java b/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java
index df0c378..0ce60f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/CronPredictor.java
@@ -27,5 +27,5 @@ public interface CronPredictor {
* @param schedule Cron schedule to predict the next time for.
* @return A prediction for the next time a cron will run.
*/
- Date predictNextRun(String schedule);
+ Date predictNextRun(CrontabEntry schedule);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java b/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
index 56e9950..f38dea5 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/CronScheduler.java
@@ -15,49 +15,19 @@
*/
package org.apache.aurora.scheduler.cron;
-import javax.annotation.Nullable;
-
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Service;
+
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
/**
* An execution manager that executes work on a cron schedule.
*/
-public interface CronScheduler extends Service {
- /**
- * Schedules a task on a cron schedule.
- *
- * @param schedule Cron-style schedule.
- * @param task Work to run when on the cron schedule.
- * @return A unique ID to identify the scheduled cron task.
- * @throws CronException when there was a failure to schedule, for example if {@code schedule}
- * is not a valid input.
- * @throws IllegalStateException If the cron scheduler is not currently running.
- */
- String schedule(String schedule, Runnable task) throws CronException, IllegalStateException;
-
- /**
- * Removes a scheduled cron item.
- *
- * @param key Key previously returned from {@link #schedule(String, Runnable)}.
- * @throws IllegalStateException If the cron scheduler is not currently running.
- */
- void deschedule(String key) throws IllegalStateException;
-
+public interface CronScheduler {
/**
* Gets the cron schedule associated with a scheduling key.
*
- * @param key Key previously returned from {@link #schedule(String, Runnable)}.
+ * @param key Key previously returned from {@link #schedule(CrontabEntry, Runnable)}.
* @return The task's cron schedule, if a matching task was found.
- * @throws IllegalStateException If the cron scheduler is not currently running.
- */
- Optional<String> getSchedule(String key) throws IllegalStateException;
-
- /**
- * Checks to see if the scheduler would be accepted by the underlying scheduler.
- *
- * @param schedule Cron scheduler to validate.
- * @return {@code true} if the schedule is valid.
*/
- boolean isValidSchedule(@Nullable String schedule);
+ Optional<CrontabEntry> getSchedule(IJobKey key);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/CrontabEntryTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/CrontabEntryTest.java b/src/main/java/org/apache/aurora/scheduler/cron/CrontabEntryTest.java
deleted file mode 100644
index 2bb848a..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/CrontabEntryTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Copyright 2014 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron;
-
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class CrontabEntryTest {
- @Test
- public void testHashCodeAndEquals() {
-
- List<CrontabEntry> entries = ImmutableList.of(
- CrontabEntry.parse("* * * * *"),
- CrontabEntry.parse("0-59 * * * *"),
- CrontabEntry.parse("0-57,58,59 * * * *"),
- CrontabEntry.parse("* 23,1,2,4,0-22 * * *"),
- CrontabEntry.parse("1-50,0,51-59 * * * sun-sat"));
-
- for (CrontabEntry lhs : entries) {
- for (CrontabEntry rhs : entries) {
- assertEquals(lhs, rhs);
- }
- }
-
- Set<CrontabEntry> equivalentEntries = Sets.newHashSet(entries);
- assertTrue(equivalentEntries.size() == 1);
- }
-
- @Test
- public void testEqualsCoverage() {
- assertNotEquals(CrontabEntry.parse("* * * * *"), new Object());
-
- assertNotEquals(CrontabEntry.parse("* * * * *"), CrontabEntry.parse("1 * * * *"));
- assertEquals(CrontabEntry.parse("1,2,3 * * * *"), CrontabEntry.parse("1-3 * * * *"));
-
- assertNotEquals(CrontabEntry.parse("* 0-22 * * *"), CrontabEntry.parse("* * * * *"));
- assertEquals(CrontabEntry.parse("* 0-23 * * *"), CrontabEntry.parse("* * * * *"));
-
- assertNotEquals(CrontabEntry.parse("1 1 1-30 * *"), CrontabEntry.parse("1 1 * * *"));
- assertEquals(CrontabEntry.parse("1 1 1-31 * *"), CrontabEntry.parse("1 1 * * *"));
-
- assertNotEquals(CrontabEntry.parse("1 1 * JAN,FEB-NOV *"), CrontabEntry.parse("1 1 * * *"));
- assertEquals(CrontabEntry.parse("1 1 * JAN,FEB-DEC *"), CrontabEntry.parse("1 1 * * *"));
-
- assertNotEquals(CrontabEntry.parse("* * * * SUN"), CrontabEntry.parse("* * * * SAT"));
- assertEquals(CrontabEntry.parse("* * * * 0"), CrontabEntry.parse("* * * * SUN"));
- }
-
- @Test
- public void testSkip() {
- assertEquals(CrontabEntry.parse("*/15 * * * *"), CrontabEntry.parse("0,15,30,45 * * * *"));
- assertEquals(
- CrontabEntry.parse("* */2 * * *"),
- CrontabEntry.parse("0-59 0,2,4,6,8,10,12-23/2 * * *"));
- }
-
- @Test
- public void testToString() {
- assertEquals("0-58 * * * *", CrontabEntry.parse("0,1-57,58 * * * *").toString());
- assertEquals("* * * * *", CrontabEntry.parse("* * * * *").toString());
- }
-
- @Test
- public void testWildcards() {
- CrontabEntry wildcardMinuteEntry = CrontabEntry.parse("* 1 1 1 *");
- assertEquals("*", wildcardMinuteEntry.getMinuteAsString());
- assertTrue(wildcardMinuteEntry.hasWildcardMinute());
- assertFalse(wildcardMinuteEntry.hasWildcardHour());
- assertFalse(wildcardMinuteEntry.hasWildcardDayOfMonth());
- assertFalse(wildcardMinuteEntry.hasWildcardMonth());
- assertTrue(wildcardMinuteEntry.hasWildcardDayOfWeek());
-
- CrontabEntry wildcardHourEntry = CrontabEntry.parse("1 * 1 1 *");
- assertEquals("*", wildcardHourEntry.getHourAsString());
- assertFalse(wildcardHourEntry.hasWildcardMinute());
- assertTrue(wildcardHourEntry.hasWildcardHour());
- assertFalse(wildcardHourEntry.hasWildcardDayOfMonth());
- assertFalse(wildcardHourEntry.hasWildcardMonth());
- assertTrue(wildcardHourEntry.hasWildcardDayOfWeek());
-
- CrontabEntry wildcardDayOfMonth = CrontabEntry.parse("1 1 * 1 *");
- assertEquals("*", wildcardDayOfMonth.getDayOfMonthAsString());
- assertFalse(wildcardDayOfMonth.hasWildcardMinute());
- assertFalse(wildcardDayOfMonth.hasWildcardHour());
- assertTrue(wildcardDayOfMonth.hasWildcardDayOfMonth());
- assertFalse(wildcardDayOfMonth.hasWildcardMonth());
- assertTrue(wildcardDayOfMonth.hasWildcardDayOfWeek());
-
- CrontabEntry wildcardMonth = CrontabEntry.parse("1 1 1 * *");
- assertEquals("*", wildcardMonth.getMonthAsString());
- assertFalse(wildcardMonth.hasWildcardMinute());
- assertFalse(wildcardMonth.hasWildcardHour());
- assertFalse(wildcardMonth.hasWildcardDayOfMonth());
- assertTrue(wildcardMonth.hasWildcardMonth());
- assertTrue(wildcardMonth.hasWildcardDayOfWeek());
-
- CrontabEntry wildcardDayOfWeek = CrontabEntry.parse("1 1 1 1 *");
- assertEquals("*", wildcardDayOfWeek.getDayOfWeekAsString());
- assertFalse(wildcardDayOfWeek.hasWildcardMinute());
- assertFalse(wildcardDayOfWeek.hasWildcardHour());
- assertFalse(wildcardDayOfWeek.hasWildcardDayOfMonth());
- assertFalse(wildcardDayOfWeek.hasWildcardMonth());
- assertTrue(wildcardDayOfWeek.hasWildcardDayOfWeek());
- }
-
- @Test
- public void testEqualsIsCanonical() {
- String rawEntry = "* * */3 * *";
- CrontabEntry input = CrontabEntry.parse(rawEntry);
- assertNotEquals(
- rawEntry + " is not the canonical form of " + input,
- rawEntry,
- input.toString());
- assertEquals(
- "The form returned by toString is canonical",
- input.toString(),
- CrontabEntry.parse(input.toString()).toString());
- }
-
- @Test
- public void testBadEntries() {
- List<String> badPatterns = ImmutableList.of(
- "* * * * MON-SUN",
- "* * **",
- "0-59 0-59 * * *",
- "1/1 * * * *",
- "5 5 * MAR-JAN *",
- "*/0 * * * *",
- "0-59/0 * * * *",
- "0-59/60 * * * *",
- "* * * *, *",
- "* * 1 * 1"
- );
-
- for (String pattern : badPatterns) {
- assertNull(CrontabEntry.tryParse(pattern).orNull());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/SanitizedCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/SanitizedCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/SanitizedCronJob.java
new file mode 100644
index 0000000..0082932
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/SanitizedCronJob.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Optional;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.commons.lang.StringUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Used by functions that expect field validation before being called.
+ */
+public final class SanitizedCronJob {
+ private final SanitizedConfiguration config;
+ private final CrontabEntry crontabEntry;
+
+ private SanitizedCronJob(IJobConfiguration unsanitized)
+ throws CronException, ConfigurationManager.TaskDescriptionException {
+
+ this(SanitizedConfiguration.fromUnsanitized(unsanitized));
+ }
+
+ private SanitizedCronJob(SanitizedConfiguration config) throws CronException {
+ final IJobConfiguration job = config.getJobConfig();
+ if (!hasCronSchedule(job)) {
+ throw new CronException(String.format(
+ "Not a valid cron job, %s has no cron schedule", JobKeys.canonicalString(job.getKey())));
+ }
+
+ Optional<CrontabEntry> entry = CrontabEntry.tryParse(job.getCronSchedule());
+ if (!entry.isPresent()) {
+ throw new CronException("Invalid cron schedule: " + job.getCronSchedule());
+ }
+
+ this.config = config;
+ this.crontabEntry = entry.get();
+ }
+
+ /**
+ * Get the default cron collision policy.
+ *
+ * @param policy A (possibly null) policy.
+ * @return The given policy or a default if the policy was null.
+ */
+ public static CronCollisionPolicy orDefault(@Nullable CronCollisionPolicy policy) {
+ return Optional.fromNullable(policy).or(CronCollisionPolicy.KILL_EXISTING);
+ }
+
+ /**
+ * Create a SanitizedCronJob from a SanitizedConfiguration. SanitizedCronJob performs additional
+ * validation to ensure that the provided job contains all properties needed to run it on a
+ * cron schedule.
+ *
+ * @param config Config to validate.
+ * @return Config wrapped in defaults.
+ * @throws CronException If a cron-specific validation error occured.
+ */
+ public static SanitizedCronJob from(SanitizedConfiguration config)
+ throws CronException {
+
+ return new SanitizedCronJob(config);
+ }
+
+ /**
+ * Create a cron job from an unsanitized input job. Suitable for RPC input validation.
+ *
+ * @param unsanitized Unsanitized input job.
+ * @return A sanitized job if all validation succeeds.
+ * @throws CronException If validation fails with a cron-specific error.
+ * @throws ConfigurationManager.TaskDescriptionException If validation fails with a non
+ * cron-specific error.
+ */
+ public static SanitizedCronJob fromUnsanitized(IJobConfiguration unsanitized)
+ throws CronException, ConfigurationManager.TaskDescriptionException {
+
+ return new SanitizedCronJob(unsanitized);
+ }
+
+ /**
+ * Get this job's cron collision policy.
+ *
+ * @return This job's cron collision policy.
+ */
+ public CronCollisionPolicy getCronCollisionPolicy() {
+ return orDefault(config.getJobConfig().getCronCollisionPolicy());
+ }
+
+ private static boolean hasCronSchedule(IJobConfiguration job) {
+ checkNotNull(job);
+ return !StringUtils.isEmpty(job.getCronSchedule());
+ }
+
+ /**
+ * Returns the cron schedule associated with this job.
+ *
+ * @return The cron schedule associated with this job.
+ */
+ public CrontabEntry getCrontabEntry() {
+ return crontabEntry;
+ }
+
+ /**
+ * Returns the sanitized job configuration associated with the cron job.
+ *
+ * @return This cron job's sanitized job configuration.
+ */
+ public SanitizedConfiguration getSanitizedConfig() {
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronModule.java
deleted file mode 100644
index e0935f5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronModule.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.noop;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-
-import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-
-/**
- * A Module to wire up a cron scheduler that does not actually schedule cron jobs.
- *
- * This class exists as a short term hack to get around a license compatibility issue - Real
- * Implementation (TM) coming soon.
- */
-public class NoopCronModule extends AbstractModule {
- @Override
- protected void configure() {
- bind(CronScheduler.class).to(NoopCronScheduler.class);
- bind(NoopCronScheduler.class).in(Singleton.class);
-
- bind(CronPredictor.class).to(NoopCronPredictor.class);
- bind(NoopCronPredictor.class).in(Singleton.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronPredictor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronPredictor.java b/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronPredictor.java
deleted file mode 100644
index 7b25152..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronPredictor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.noop;
-
-import java.util.Date;
-
-import org.apache.aurora.scheduler.cron.CronPredictor;
-
-/**
- * A cron predictor that always suggests that the next run is Unix epoch time.
- *
- * This class exists as a short term hack to get around a license compatibility issue - Real
- * Implementation (TM) coming soon.
- */
-class NoopCronPredictor implements CronPredictor {
- @Override
- public Date predictNextRun(String schedule) {
- return new Date(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java b/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
deleted file mode 100644
index a31551c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/noop/NoopCronScheduler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.noop;
-
-import java.util.Collections;
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import org.apache.aurora.scheduler.cron.CronScheduler;
-
-/**
- * A cron scheduler that accepts cron jobs but never runs them. Useful if you want to hook up an
- * external triggering mechanism (e.g. a system cron job that calls the startCronJob RPC manually
- * on an interval).
- *
- * This class exists as a short term hack to get around a license compatibility issue - Real
- * Implementation (TM) coming soon.
- */
-class NoopCronScheduler extends AbstractIdleService implements CronScheduler {
- private static final Logger LOG = Logger.getLogger(NoopCronScheduler.class.getName());
-
- // Keep a list of schedules we've seen.
- private final Set<String> schedules = Collections.synchronizedSet(Sets.<String>newHashSet());
-
- @Override
- public void startUp() throws Exception {
- LOG.warning("NO-OP cron scheduler is in use. Cron jobs submitted will not be triggered!");
- }
-
- @Override
- public void shutDown() {
- // No-op.
- }
-
- @Override
- public String schedule(String schedule, Runnable task) {
- schedules.add(schedule);
-
- LOG.warning(String.format(
- "NO-OP cron scheduler is in use! %s with schedule %s WILL NOT be automatically triggered!",
- task,
- schedule));
-
- return schedule;
- }
-
- @Override
- public void deschedule(String key) throws IllegalStateException {
- schedules.remove(key);
- }
-
- @Override
- public Optional<String> getSchedule(String key) throws IllegalStateException {
- return schedules.contains(key)
- ? Optional.of(key)
- : Optional.<String>absent();
- }
-
- @Override
- public boolean isValidSchedule(@Nullable String schedule) {
- // Accept everything.
- return schedule != null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
new file mode 100644
index 0000000..fc02264
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -0,0 +1,231 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+
+import com.twitter.common.base.Supplier;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.BackoffHelper;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+
+/**
+ * Encapsulates the logic behind a single trigger of a single job key. Multiple executions may run
+ * concurrently but only a single instance will be active at a time per job key.
+ *
+ * <p>
+ * Executions may block for long periods of time when waiting for a kill to complete. The Quartz
+ * scheduler should therefore be configured with a large number of threads.
+ */
+@DisallowConcurrentExecution
+class AuroraCronJob implements Job {
+ private static final Logger LOG = Logger.getLogger(AuroraCronJob.class.getName());
+
+ private static final AtomicLong CRON_JOB_TRIGGERS = Stats.exportLong("cron_job_triggers");
+ private static final AtomicLong CRON_JOB_MISFIRES = Stats.exportLong("cron_job_misfires");
+ private static final AtomicLong CRON_JOB_PARSE_FAILURES =
+ Stats.exportLong("cron_job_parse_failures");
+ private static final AtomicLong CRON_JOB_COLLISIONS = Stats.exportLong("cron_job_collisions");
+
+ @VisibleForTesting
+ static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cronScheduler");
+
+ private final Storage storage;
+ private final StateManager stateManager;
+ private final CronJobManager cronJobManager;
+ private final BackoffHelper delayedStartBackoff;
+
+ @Inject
+ AuroraCronJob(
+ Config config,
+ Storage storage,
+ StateManager stateManager,
+ CronJobManager cronJobManager) {
+
+ this.storage = checkNotNull(storage);
+ this.stateManager = checkNotNull(stateManager);
+ this.cronJobManager = checkNotNull(cronJobManager);
+ this.delayedStartBackoff = checkNotNull(config.getDelayedStartBackoff());
+ }
+
+ private static final class DeferredLaunch {
+ private final Map<Integer, ITaskConfig> pendingTasks;
+ private final Set<String> activeTaskIds;
+
+ private DeferredLaunch(Map<Integer, ITaskConfig> pendingTasks, Set<String> activeTaskIds) {
+ this.pendingTasks = pendingTasks;
+ this.activeTaskIds = activeTaskIds;
+ }
+ }
+
+ @Override
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+ // We assume quartz prevents concurrent runs of this job for a given job key. This allows us
+ // to avoid races where we might kill another run's tasks.
+ checkState(context.getJobDetail().isConcurrentExectionDisallowed());
+
+ doExecute(Quartz.auroraJobKey(context.getJobDetail().getKey()));
+ }
+
+ @VisibleForTesting
+ void doExecute(final IJobKey key) throws JobExecutionException {
+ final String path = JobKeys.canonicalString(key);
+
+ final Optional<DeferredLaunch> deferredLaunch = storage.write(
+ new Storage.MutateWork.Quiet<Optional<DeferredLaunch>>() {
+ @Override
+ public Optional<DeferredLaunch> apply(Storage.MutableStoreProvider storeProvider) {
+ Optional<IJobConfiguration> config =
+ storeProvider.getJobStore().fetchJob(cronJobManager.getManagerKey(), key);
+ if (!config.isPresent()) {
+ LOG.warning(String.format(
+ "Cron was triggered for %s but no job with that key was found in storage.",
+ path));
+ CRON_JOB_MISFIRES.incrementAndGet();
+ return Optional.absent();
+ }
+
+ SanitizedCronJob cronJob;
+ try {
+ cronJob = SanitizedCronJob.fromUnsanitized(config.get());
+ } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
+ LOG.warning(String.format(
+ "Invalid cron job for %s in storage - failed to parse with %s", key, e));
+ CRON_JOB_PARSE_FAILURES.incrementAndGet();
+ return Optional.absent();
+ }
+
+ CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
+ LOG.info(String.format(
+ "Cron triggered for %s at %s with policy %s", path, new Date(), collisionPolicy));
+ CRON_JOB_TRIGGERS.incrementAndGet();
+
+ ImmutableMap<Integer, ITaskConfig> pendingTasks =
+ ImmutableMap.copyOf(cronJob.getSanitizedConfig().getTaskConfigs());
+
+ final Query.Builder activeQuery = Query.jobScoped(key).active();
+ Set<String> activeTasks =
+ Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
+
+ if (activeTasks.isEmpty()) {
+ stateManager.insertPendingTasks(pendingTasks);
+ return Optional.absent();
+ }
+
+ CRON_JOB_COLLISIONS.incrementAndGet();
+ switch (collisionPolicy) {
+ case KILL_EXISTING:
+ return Optional.of(new DeferredLaunch(pendingTasks, activeTasks));
+
+ case RUN_OVERLAP:
+ LOG.severe(String.format("Ignoring trigger for job %s with deprecated collision"
+ + "policy RUN_OVERLAP due to unterminated active tasks.", path));
+ return Optional.absent();
+
+ case CANCEL_NEW:
+ return Optional.absent();
+
+ default:
+ LOG.severe("Unrecognized cron collision policy: " + collisionPolicy);
+ return Optional.absent();
+ }
+ }
+ }
+ );
+
+ if (!deferredLaunch.isPresent()) {
+ return;
+ }
+
+ for (String taskId : deferredLaunch.get().activeTaskIds) {
+ stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ KILLING,
+ KILL_AUDIT_MESSAGE);
+ }
+ LOG.info(String.format("Waiting for job to terminate before launching cron job %s.", path));
+
+ final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active();
+ try {
+ // NOTE: We block the quartz execution thread here until we've successfully killed our
+ // ancestor. We mitigate this by using a cached thread pool for quartz.
+ delayedStartBackoff.doUntilSuccess(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
+ LOG.info("Initiating delayed launch of cron " + path);
+ stateManager.insertPendingTasks(deferredLaunch.get().pendingTasks);
+ return true;
+ } else {
+ LOG.info("Not yet safe to run cron " + path);
+ return false;
+ }
+ }
+ });
+ } catch (InterruptedException e) {
+ LOG.log(Level.WARNING, "Interrupted while trying to launch cron " + path, e);
+ Thread.currentThread().interrupt();
+ throw new JobExecutionException(e);
+ }
+ }
+
+ static class Config {
+ private final BackoffHelper delayedStartBackoff;
+
+ Config(BackoffHelper delayedStartBackoff) {
+ this.delayedStartBackoff = checkNotNull(delayedStartBackoff);
+ }
+
+ public BackoffHelper getDelayedStartBackoff() {
+ return delayedStartBackoff;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobFactory.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobFactory.java
new file mode 100644
index 0000000..c5268cb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import javax.inject.Inject;
+import javax.inject.Provider;
+
+import org.quartz.Job;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.spi.JobFactory;
+import org.quartz.spi.TriggerFiredBundle;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+* Adapter that allows AuroraCronJobs to be constructed by Guice instead of directly by quartz.
+*/
+class AuroraCronJobFactory implements JobFactory {
+ private final Provider<AuroraCronJob> auroraCronJobProvider;
+
+ @Inject
+ AuroraCronJobFactory(Provider<AuroraCronJob> auroraCronJobProvider) {
+ this.auroraCronJobProvider = checkNotNull(auroraCronJobProvider);
+ }
+
+ @Override
+ public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
+ checkState(AuroraCronJob.class.equals(bundle.getJobDetail().getJobClass()),
+ "Quartz tried to run a type of job we don't know about: "
+ + bundle.getJobDetail().getJobClass());
+
+ return auroraCronJobProvider.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
new file mode 100644
index 0000000..8a5f569
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java
@@ -0,0 +1,256 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.storage.JobStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.impl.matchers.GroupMatcher;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * NOTE: The source of truth for whether a cron job exists or not is always the JobStore. If state
+ * somehow becomes inconsistent (i.e. a job key is scheduled for execution but its underlying
+ * JobConfiguration does not exist in storage the execution of the job will log a warning and
+ * exit).
+ */
+class CronJobManagerImpl implements CronJobManager {
+ private static final Logger LOG = Logger.getLogger(CronJobManagerImpl.class.getName());
+
+ private final Storage storage;
+ private final Scheduler scheduler;
+ private final TimeZone timeZone;
+
+ @Inject
+ CronJobManagerImpl(Storage storage, Scheduler scheduler, TimeZone timeZone) {
+ this.storage = checkNotNull(storage);
+ this.scheduler = checkNotNull(scheduler);
+ this.timeZone = checkNotNull(timeZone);
+ }
+
+ @Override
+ public String getManagerKey() {
+ return "CRON";
+ }
+
+ @Override
+ public void startJobNow(final IJobKey jobKey) throws CronException {
+ checkNotNull(jobKey);
+
+ storage.weaklyConsistentRead(new Work<Void, CronException>() {
+ @Override
+ public Void apply(Storage.StoreProvider storeProvider) throws CronException {
+ checkCronExists(jobKey, storeProvider.getJobStore());
+ triggerJob(jobKey);
+ return null;
+ }
+ });
+ }
+
+ private void triggerJob(IJobKey jobKey) throws CronException {
+ try {
+ scheduler.triggerJob(Quartz.jobKey(jobKey));
+ } catch (SchedulerException e) {
+ throw new CronException(e);
+ }
+ LOG.info(String.format("Triggered cron job for %s.", JobKeys.canonicalString(jobKey)));
+ }
+
+ private static void checkNoRunOverlap(SanitizedCronJob cronJob) throws CronException {
+ // NOTE: We check at create and update instead of in SanitizedCronJob to allow existing jobs
+ // but reject new ones.
+ if (CronCollisionPolicy.RUN_OVERLAP.equals(cronJob.getCronCollisionPolicy())) {
+ throw new CronException(
+ "The RUN_OVERLAP collision policy has been removed (AURORA-38).");
+ }
+ }
+
+ @Override
+ public void updateJob(final SanitizedCronJob config) throws CronException {
+ checkNotNull(config);
+ checkNoRunOverlap(config);
+
+ final IJobKey jobKey = config.getSanitizedConfig().getJobConfig().getKey();
+ storage.write(new MutateWork.NoResult<CronException>() {
+ @Override
+ public void execute(Storage.MutableStoreProvider storeProvider) throws CronException {
+ checkCronExists(jobKey, storeProvider.getJobStore());
+
+ removeJob(jobKey, storeProvider.getJobStore());
+ descheduleJob(jobKey);
+ saveJob(config, storeProvider.getJobStore());
+ scheduleJob(config.getCrontabEntry(), jobKey);
+ }
+ });
+ }
+
+ @Override
+ public void createJob(final SanitizedCronJob cronJob) throws CronException {
+ checkNotNull(cronJob);
+ checkNoRunOverlap(cronJob);
+
+ final IJobKey jobKey = cronJob.getSanitizedConfig().getJobConfig().getKey();
+ storage.write(new MutateWork.NoResult<CronException>() {
+ @Override
+ protected void execute(Storage.MutableStoreProvider storeProvider) throws CronException {
+ checkNotExists(jobKey, storeProvider.getJobStore());
+
+ saveJob(cronJob, storeProvider.getJobStore());
+ scheduleJob(cronJob.getCrontabEntry(), jobKey);
+ }
+ });
+ }
+
+ private void checkNotExists(IJobKey jobKey, JobStore jobStore) throws CronException {
+ if (jobStore.fetchJob(getManagerKey(), jobKey).isPresent()) {
+ throw new CronException(
+ String.format("Job already exists for %s.", JobKeys.canonicalString(jobKey)));
+ }
+ }
+
+ private void checkCronExists(IJobKey jobKey, JobStore jobStore) throws CronException {
+ if (!jobStore.fetchJob(getManagerKey(), jobKey).isPresent()) {
+ throw new CronException(
+ String.format("No cron template found for %s.", JobKeys.canonicalString(jobKey)));
+ }
+ }
+
+ private void removeJob(IJobKey jobKey, JobStore.Mutable jobStore) {
+ jobStore.removeJob(jobKey);
+ LOG.info(
+ String.format("Deleted cron job %s from storage.", JobKeys.canonicalString(jobKey)));
+ }
+
+ private void saveJob(SanitizedCronJob cronJob, JobStore.Mutable jobStore) {
+ IJobConfiguration config = cronJob.getSanitizedConfig().getJobConfig();
+
+ jobStore.saveAcceptedJob(getManagerKey(), config);
+ LOG.info(String.format(
+ "Saved new cron job %s to storage.", JobKeys.canonicalString(config.getKey())));
+ }
+
+ // TODO(ksweeney): Consider exposing this in the interface and making caller responsible.
+ void scheduleJob(CrontabEntry crontabEntry, IJobKey jobKey) throws CronException {
+ try {
+ scheduler.scheduleJob(
+ Quartz.jobDetail(jobKey, AuroraCronJob.class),
+ Quartz.cronTrigger(crontabEntry, timeZone));
+ } catch (SchedulerException e) {
+ throw new CronException(e);
+ }
+ LOG.info(String.format(
+ "Scheduled job %s with schedule %s.", JobKeys.canonicalString(jobKey), crontabEntry));
+ }
+
+ @Override
+ public Iterable<IJobConfiguration> getJobs() {
+ // NOTE: no synchronization is needed here since we don't touch internal quartz state.
+ return storage.consistentRead(new Work.Quiet<Iterable<IJobConfiguration>>() {
+ @Override
+ public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) {
+ return storeProvider.getJobStore().fetchJobs(getManagerKey());
+ }
+ });
+ }
+
+ @Override
+ public boolean hasJob(final IJobKey jobKey) {
+ checkNotNull(jobKey);
+
+ return storage.consistentRead(new Work.Quiet<Boolean>() {
+ @Override
+ public Boolean apply(Storage.StoreProvider storeProvider) {
+ return storeProvider.getJobStore().fetchJob(getManagerKey(), jobKey).isPresent();
+ }
+ });
+ }
+
+ @Override
+ public boolean deleteJob(final IJobKey jobKey) {
+ checkNotNull(jobKey);
+
+ return storage.write(new MutateWork.Quiet<Boolean>() {
+ @Override
+ public Boolean apply(Storage.MutableStoreProvider storeProvider) {
+ if (!hasJob(jobKey)) {
+ return false;
+ }
+
+ removeJob(jobKey, storeProvider.getJobStore());
+ descheduleJob(jobKey);
+ return true;
+ }
+ });
+ }
+
+ private void descheduleJob(IJobKey jobKey) {
+ String path = JobKeys.canonicalString(jobKey);
+ try {
+ // TODO(ksweeney): Consider interrupting the running job here.
+ // There's a race here where an old running job could fail to find the old config. That's
+ // fine given that the behavior of AuroraCronJob is to log an error and exit if it's unable
+ // to find a job for its key.
+ scheduler.deleteJob(Quartz.jobKey(jobKey));
+ LOG.info("Successfully descheduled " + path + ".");
+ } catch (SchedulerException e) {
+ LOG.log(Level.WARNING, "Error when attempting to deschedule " + path + ": " + e, e);
+ }
+ }
+
+ @Override
+ public Map<IJobKey, CrontabEntry> getScheduledJobs() {
+ // NOTE: no synchronization is needed here since this is just a dump of internal quartz state
+ // for debugging.
+ ImmutableMap.Builder<IJobKey, CrontabEntry> scheduledJobs = ImmutableMap.builder();
+ try {
+ for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.<JobKey>anyGroup())) {
+ Optional<JobDetail> jobDetail = Optional.fromNullable(scheduler.getJobDetail(jobKey));
+ if (jobDetail.isPresent()) {
+ scheduledJobs.put(
+ Quartz.auroraJobKey(jobKey), CrontabEntry.parse(jobDetail.get().getDescription()));
+ }
+ }
+ } catch (SchedulerException e) {
+ throw Throwables.propagate(e);
+ }
+ return scheduledJobs.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
new file mode 100644
index 0000000..64fa068
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
@@ -0,0 +1,114 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Command;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.scheduler.configuration.ConfigurationManager;
+import org.apache.aurora.scheduler.cron.CronException;
+import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.quartz.Scheduler;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Manager for startup and teardown of Quartz scheduler.
+ */
+class CronLifecycle extends AbstractIdleService implements PubsubEvent.EventSubscriber {
+ private static final Logger LOG = Logger.getLogger(CronLifecycle.class.getName());
+
+ private static final AtomicInteger RUNNING_FLAG = Stats.exportInt("quartz_scheduler_running");
+ private static final AtomicInteger LOADED_FLAG = Stats.exportInt("cron_jobs_loaded");
+ private static final AtomicLong LAUNCH_FAILURES = Stats.exportLong("cron_job_launch_failures");
+
+ private final Scheduler scheduler;
+ private final ShutdownRegistry shutdownRegistry;
+ private final CronJobManagerImpl cronJobManager;
+
+ @Inject
+ CronLifecycle(
+ Scheduler scheduler,
+ ShutdownRegistry shutdownRegistry,
+ CronJobManagerImpl cronJobManager) {
+
+ this.scheduler = checkNotNull(scheduler);
+ this.shutdownRegistry = checkNotNull(shutdownRegistry);
+ this.cronJobManager = checkNotNull(cronJobManager);
+ }
+
+ /**
+ * Notifies the cronScheduler job manager that the scheduler is active, and job configurations
+ * are ready to load.
+ *
+ * @param schedulerActive Event.
+ */
+ @Subscribe
+ public void schedulerActive(PubsubEvent.SchedulerActive schedulerActive) {
+ startAsync();
+ shutdownRegistry.addAction(new Command() {
+ @Override
+ public void execute() {
+ CronLifecycle.this.stopAsync().awaitTerminated();
+ }
+ });
+ awaitRunning();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOG.info("Starting Quartz cron scheduler" + scheduler.getSchedulerName() + ".");
+ scheduler.start();
+ RUNNING_FLAG.set(1);
+
+ // TODO(ksweeney): Refactor the interface - we really only need the job keys here.
+ for (IJobConfiguration job : cronJobManager.getJobs()) {
+ try {
+ SanitizedCronJob cronJob = SanitizedCronJob.fromUnsanitized(job);
+ cronJobManager.scheduleJob(
+ cronJob.getCrontabEntry(),
+ cronJob.getSanitizedConfig().getJobConfig().getKey());
+ } catch (CronException | ConfigurationManager.TaskDescriptionException e) {
+ logLaunchFailure(job, e);
+ }
+ }
+ LOADED_FLAG.set(1);
+ }
+
+ private void logLaunchFailure(IJobConfiguration job, Exception e) {
+ LAUNCH_FAILURES.incrementAndGet();
+ LOG.log(Level.SEVERE, "Scheduling failed for recovered job " + job, e);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOG.info("Shutting down Quartz cron scheduler.");
+ scheduler.shutdown();
+ RUNNING_FLAG.set(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
new file mode 100644
index 0000000..6934828
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Throwables;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffHelper;
+
+import org.apache.aurora.scheduler.cron.CronJobManager;
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.impl.DirectSchedulerFactory;
+import org.quartz.simpl.RAMJobStore;
+import org.quartz.simpl.SimpleThreadPool;
+
+/**
+ * Provides a {@link CronJobManager} with a Quartz backend. While Quartz itself supports
+ * persistence, the scheduler exposed by this module does not persist any state - it simply
+ * creates tasks from a {@link org.apache.aurora.gen.JobConfiguration} template on a cron-style
+ * schedule.
+ */
+public class CronModule extends AbstractModule {
+ private static final Logger LOG = Logger.getLogger(CronModule.class.getName());
+
+ @CmdLine(name = "cron_scheduler_num_threads",
+ help = "Number of threads to use for the cron scheduler thread pool.")
+ private static final Arg<Integer> NUM_THREADS = Arg.create(100);
+
+ @CmdLine(name = "cron_timezone", help = "TimeZone to use for cron predictions.")
+ private static final Arg<String> CRON_TIMEZONE = Arg.create("GMT");
+
+ @CmdLine(name = "cron_start_initial_backoff", help =
+ "Initial backoff delay while waiting for a previous cron run to be killed.")
+ public static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
+ Arg.create(Amount.of(1L, Time.SECONDS));
+
+ @CmdLine(name = "cron_start_max_backoff", help =
+ "Max backoff delay while waiting for a previous cron run to be killed.")
+ public static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
+ Arg.create(Amount.of(1L, Time.MINUTES));
+
+ // Global per-JVM ID number generator for the provided Quartz Scheduler.
+ private static final AtomicLong ID_GENERATOR = new AtomicLong();
+
+ @Override
+ protected void configure() {
+ bind(CronPredictor.class).to(CronPredictorImpl.class);
+ bind(CronPredictorImpl.class).in(Singleton.class);
+
+ bind(CronJobManager.class).to(CronJobManagerImpl.class);
+ bind(CronJobManagerImpl.class).in(Singleton.class);
+
+ bind(CronScheduler.class).to(CronSchedulerImpl.class);
+ bind(CronSchedulerImpl.class).in(Singleton.class);
+
+ bind(AuroraCronJobFactory.class).in(Singleton.class);
+
+ bind(AuroraCronJob.class).in(Singleton.class);
+ bind(AuroraCronJob.Config.class).toInstance(new AuroraCronJob.Config(
+ new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get())));
+
+ bind(CronLifecycle.class).in(Singleton.class);
+ PubsubEventModule.bindSubscriber(binder(), CronLifecycle.class);
+ }
+
+ @Provides
+ private TimeZone provideTimeZone() {
+ TimeZone timeZone = TimeZone.getTimeZone(CRON_TIMEZONE.get());
+ TimeZone systemTimeZone = TimeZone.getDefault();
+ if (!timeZone.equals(systemTimeZone)) {
+ LOG.warning("Cron schedules are configured to fire according to timezone "
+ + timeZone.getDisplayName()
+ + " but system timezone is set to "
+ + systemTimeZone.getDisplayName());
+ }
+ return timeZone;
+ }
+
+ /*
+ * NOTE: Quartz implements DirectSchedulerFactory as a mutable global singleton in a static
+ * variable. While the Scheduler instances it produces are independent we synchronize here to
+ * avoid an initialization race across injectors. In practice this only shows up during testing;
+ * production Aurora instances will only have one object graph at a time.
+ */
+ @Provides
+ @Singleton
+ private static synchronized Scheduler provideScheduler(AuroraCronJobFactory jobFactory) {
+ SimpleThreadPool threadPool = new SimpleThreadPool(NUM_THREADS.get(), Thread.NORM_PRIORITY);
+ threadPool.setMakeThreadsDaemons(true);
+
+ DirectSchedulerFactory schedulerFactory = DirectSchedulerFactory.getInstance();
+ String schedulerName = "aurora-cron-" + ID_GENERATOR.incrementAndGet();
+ try {
+ schedulerFactory.createScheduler(schedulerName, schedulerName, threadPool, new RAMJobStore());
+ Scheduler scheduler = schedulerFactory.getScheduler(schedulerName);
+ scheduler.setJobFactory(jobFactory);
+ return scheduler;
+ } catch (SchedulerException e) {
+ LOG.severe("Error initializing Quartz cron scheduler: " + e);
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImpl.java
new file mode 100644
index 0000000..fb24c28
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronPredictorImpl.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.Date;
+import java.util.TimeZone;
+
+import javax.inject.Inject;
+
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.cron.CronPredictor;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.quartz.CronExpression;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+class CronPredictorImpl implements CronPredictor {
+ private final Clock clock;
+ private final TimeZone timeZone;
+
+ @Inject
+ CronPredictorImpl(Clock clock, TimeZone timeZone) {
+ this.clock = checkNotNull(clock);
+ this.timeZone = checkNotNull(timeZone);
+ }
+
+ @Override
+ public Date predictNextRun(CrontabEntry schedule) {
+ CronExpression cronExpression = Quartz.cronExpression(schedule, timeZone);
+ return cronExpression.getNextValidTimeAfter(new Date(clock.nowMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java
new file mode 100644
index 0000000..3bef22d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.twitter.common.base.Function;
+
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.quartz.CronTrigger;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.scheduler.cron.quartz.Quartz.jobKey;
+
+class CronSchedulerImpl implements CronScheduler {
+ private static final Logger LOG = Logger.getLogger(CronSchedulerImpl.class.getName());
+
+ private final Scheduler scheduler;
+
+ @Inject
+ CronSchedulerImpl(Scheduler scheduler) {
+ this.scheduler = checkNotNull(scheduler);
+ }
+
+ @Override
+ public Optional<CrontabEntry> getSchedule(IJobKey jobKey) throws IllegalStateException {
+ checkNotNull(jobKey);
+
+ try {
+ return Optional.of(Iterables.getOnlyElement(
+ FluentIterable.from(scheduler.getTriggersOfJob(jobKey(jobKey)))
+ .filter(CronTrigger.class)
+ .transform(new Function<CronTrigger, CrontabEntry>() {
+ @Override
+ public CrontabEntry apply(CronTrigger trigger) {
+ return Quartz.crontabEntry(trigger);
+ }
+ })));
+ } catch (SchedulerException e) {
+ LOG.log(Level.SEVERE,
+ "Error reading job " + JobKeys.canonicalString(jobKey) + " cronExpression Quartz: " + e,
+ e);
+ return Optional.absent();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/quartz/Quartz.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/Quartz.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/Quartz.java
new file mode 100644
index 0000000..63aaade
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/Quartz.java
@@ -0,0 +1,124 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.cron.quartz;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.TimeZone;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.cron.CrontabEntry;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.TriggerBuilder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Utilities for converting Aurora datatypes to Quartz datatypes.
+ */
+final class Quartz {
+ private Quartz() {
+ // Utility class.
+ }
+
+ /**
+ * Convert an Aurora CrontabEntry to a Quartz CronExpression.
+ */
+ static CronExpression cronExpression(CrontabEntry entry, TimeZone timeZone) {
+ String dayOfMonth;
+ if (entry.hasWildcardDayOfMonth()) {
+ dayOfMonth = "?"; // special quartz token meaning "don't care"
+ } else {
+ dayOfMonth = entry.getDayOfMonthAsString();
+ }
+ String dayOfWeek;
+ if (entry.hasWildcardDayOfWeek() && !entry.hasWildcardDayOfMonth()) {
+ dayOfWeek = "?";
+ } else {
+ List<Integer> daysOfWeek = Lists.newArrayList();
+ for (Range<Integer> range : entry.getDayOfWeek().asRanges()) {
+ for (int i : ContiguousSet.create(range, DiscreteDomain.integers())) {
+ daysOfWeek.add(i + 1); // Quartz has an off-by-one with what the "standard" defines.
+ }
+ }
+ dayOfWeek = Joiner.on(",").join(daysOfWeek);
+ }
+
+ String rawCronExpresion = Joiner.on(" ").join(
+ "0",
+ entry.getMinuteAsString(),
+ entry.getHourAsString(),
+ dayOfMonth,
+ entry.getMonthAsString(),
+ dayOfWeek);
+ CronExpression cronExpression;
+ try {
+ cronExpression = new CronExpression(rawCronExpresion);
+ } catch (ParseException e) {
+ throw Throwables.propagate(e);
+ }
+ cronExpression.setTimeZone(timeZone);
+ return cronExpression;
+ }
+
+ /**
+ * Convert a Quartz JobKey to an Aurora IJobKey.
+ */
+ static IJobKey auroraJobKey(org.quartz.JobKey jobKey) {
+ return JobKeys.parse(jobKey.getName());
+ }
+
+ /**
+ * Convert an Aurora IJobKey to a Quartz JobKey.
+ */
+ static JobKey jobKey(IJobKey jobKey) {
+ return JobKey.jobKey(JobKeys.canonicalString(jobKey));
+ }
+
+ static CronTrigger cronTrigger(CrontabEntry schedule, TimeZone timeZone) {
+ return TriggerBuilder.newTrigger()
+ .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression(schedule, timeZone)))
+ .withDescription(schedule.toString())
+ .build();
+ }
+
+ static JobDetail jobDetail(IJobKey jobKey, Class<? extends Job> jobClass) {
+ checkNotNull(jobKey);
+ checkNotNull(jobClass);
+
+ return JobBuilder.newJob(jobClass)
+ .withIdentity(jobKey(jobKey))
+ .build();
+ }
+
+ static CrontabEntry crontabEntry(CronTrigger cronTrigger) {
+ return CrontabEntry.parse(cronTrigger.getDescription());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java b/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
deleted file mode 100644
index 61b01d2..0000000
--- a/src/main/java/org/apache/aurora/scheduler/cron/testing/AbstractCronIT.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.cron.testing;
-
-import java.io.InputStreamReader;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.scheduler.cron.CronPredictor;
-import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Abstract test to verify conformance with the {@link CronScheduler} interface.
- */
-public abstract class AbstractCronIT {
- private static final String WILDCARD_SCHEDULE = "* * * * *";
-
- /**
- * Child should return an instance of the {@link CronScheduler} test under test here.
- */
- protected abstract CronScheduler makeCronScheduler() throws Exception;
-
- /**
- * Child should return an instance of the {@link CronPredictor} under test here.
- *
- * @param clock The clock the predictor should use.
- */
- protected abstract CronPredictor makeCronPredictor(Clock clock) throws Exception;
-
- @Test
- public void testCronSchedulerLifecycle() throws Exception {
- CronScheduler scheduler = makeCronScheduler();
-
- scheduler.startAsync().awaitRunning();
- final CountDownLatch cronRan = new CountDownLatch(1);
- scheduler.schedule(WILDCARD_SCHEDULE, new Runnable() {
- @Override public void run() {
- cronRan.countDown();
- }
- });
- cronRan.await();
- scheduler.stopAsync().awaitTerminated();
- }
-
- @Test
- public void testCronPredictorConforms() throws Exception {
- FakeClock clock = new FakeClock();
- CronPredictor cronPredictor = makeCronPredictor(clock);
-
- for (TriggerPrediction triggerPrediction : getExpectedTriggerPredictions()) {
- List<Long> results = Lists.newArrayList();
- clock.setNowMillis(0);
- for (int i = 0; i < triggerPrediction.getTriggerTimes().size(); i++) {
- Date nextTriggerTime = cronPredictor.predictNextRun(triggerPrediction.getSchedule());
- results.add(nextTriggerTime.getTime());
- clock.setNowMillis(nextTriggerTime.getTime());
- }
- assertEquals("Cron schedule "
- + triggerPrediction.getSchedule()
- + " should have have predicted trigger times "
- + triggerPrediction.getTriggerTimes()
- + " but predicted "
- + results
- + " instead.", triggerPrediction.getTriggerTimes(), results);
- }
- }
-
- @Test
- public void testCronScheduleValidatorAcceptsValidSchedules() throws Exception {
- CronScheduler cron = makeCronScheduler();
-
- for (TriggerPrediction triggerPrediction : getExpectedTriggerPredictions()) {
- assertTrue("Cron schedule " + triggerPrediction.getSchedule() + " should pass validation.",
- cron.isValidSchedule(triggerPrediction.getSchedule()));
- }
- }
-
- private static List<TriggerPrediction> getExpectedTriggerPredictions() {
- return new Gson()
- .fromJson(
- new InputStreamReader(
- AbstractCronIT.class.getResourceAsStream("cron-schedule-predictions.json")),
- new TypeToken<List<TriggerPrediction>>() { }.getType());
- }
-
- /**
- * A schedule and the expected iteratively-applied prediction results.
- */
- public static class TriggerPrediction {
- private String schedule;
- private List<Long> triggerTimes;
-
- private TriggerPrediction() {
- // GSON constructor.
- }
-
- public TriggerPrediction(String schedule, List<Long> triggerTimes) {
- this.schedule = schedule;
- this.triggerTimes = triggerTimes;
- }
-
- public String getSchedule() {
- return schedule;
- }
-
- public List<Long> getTriggerTimes() {
- return ImmutableList.copyOf(triggerTimes);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c285f2f8/src/main/java/org/apache/aurora/scheduler/http/Cron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Cron.java b/src/main/java/org/apache/aurora/scheduler/http/Cron.java
index 80a398a..d8c44f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Cron.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Cron.java
@@ -26,7 +26,7 @@ import javax.ws.rs.core.Response;
import com.google.common.collect.ImmutableMap;
-import org.apache.aurora.scheduler.state.CronJobManager;
+import org.apache.aurora.scheduler.cron.CronJobManager;
/**
* HTTP interface to dump state of the internal cron scheduler.
@@ -50,7 +50,6 @@ public class Cron {
public Response dumpContents() {
Map<String, Object> response = ImmutableMap.<String, Object>builder()
.put("scheduled", cronManager.getScheduledJobs())
- .put("pending", cronManager.getPendingRuns())
.build();
return Response.ok(response).build();