You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/09/16 21:45:18 UTC
[2/3] aurora git commit: Batching writes - Part 2 (of 3): Converting
cron jobs to use BatchWorker.
Batching writes - Part 2 (of 3): Converting cron jobs to use BatchWorker.
Reviewed at https://reviews.apache.org/r/51763/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2cb43d61
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2cb43d61
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2cb43d61
Branch: refs/heads/master
Commit: 2cb43d61ecafb79b31d36332ef4713b9857b3c1a
Parents: ebfeb3e
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:26 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:26 2016 -0700
----------------------------------------------------------------------
.../aurora/common/util/BackoffHelper.java | 8 +
.../aurora/common/util/BackoffHelperTest.java | 7 +
.../scheduler/cron/quartz/AuroraCronJob.java | 239 +++++++++++--------
.../scheduler/cron/quartz/CronModule.java | 25 +-
.../cron/quartz/AuroraCronJobTest.java | 107 ++++++---
.../aurora/scheduler/cron/quartz/CronIT.java | 4 +
6 files changed, 256 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
index 8e73dd9..517c0ef 100644
--- a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
+++ b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
@@ -90,6 +90,14 @@ public class BackoffHelper {
}
/**
+ * Gets {@link BackoffStrategy} instance the BackoffHelper is initialized with.
+ * @return instance of {@link BackoffStrategy} used by BackoffHelper.
+ */
+ public BackoffStrategy getBackoffStrategy() {
+ return backoffStrategy;
+ }
+
+ /**
* Executes the given task using the configured backoff strategy until the task succeeds as
* indicated by returning a non-null value.
*
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
index bc30990..012fbac 100644
--- a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
@@ -41,6 +41,13 @@ public class BackoffHelperTest extends EasyMockTest {
}
@Test
+ public void testGetBackoffStrategy() {
+ control.replay();
+
+ assertEquals(backoffStrategy, backoffHelper.getBackoffStrategy());
+ }
+
+ @Test
public void testDoUntilSuccess() throws Exception {
ExceptionalSupplier<Boolean, RuntimeException> task =
createMock(new Clazz<ExceptionalSupplier<Boolean, RuntimeException>>() { });
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index c07551e..7c8047a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -13,29 +13,37 @@
*/
package org.apache.aurora.scheduler.cron.quartz;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
import java.util.Date;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
+import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.BackoffHelper;
import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.NoResult;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.cron.CronException;
import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -43,9 +51,14 @@ import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
import static com.google.common.base.Preconditions.checkState;
@@ -61,7 +74,8 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING;
* scheduler should therefore be configured with a large number of threads.
*/
@DisallowConcurrentExecution
-class AuroraCronJob implements Job {
+@PersistJobDataAfterExecution
+class AuroraCronJob implements Job, EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(AuroraCronJob.class);
private static final AtomicLong CRON_JOB_TRIGGERS = Stats.exportLong("cron_job_triggers");
@@ -69,147 +83,166 @@ class AuroraCronJob implements Job {
private static final AtomicLong CRON_JOB_PARSE_FAILURES =
Stats.exportLong("cron_job_parse_failures");
private static final AtomicLong CRON_JOB_COLLISIONS = Stats.exportLong("cron_job_collisions");
+ private static final AtomicLong CRON_JOB_CONCURRENT_RUNS =
+ Stats.exportLong("cron_job_concurrent_runs");
@VisibleForTesting
static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cronScheduler");
private final ConfigurationManager configurationManager;
- private final Storage storage;
private final StateManager stateManager;
private final BackoffHelper delayedStartBackoff;
+ private final BatchWorker<NoResult> batchWorker;
+ private final Set<IJobKey> killFollowups = Sets.newConcurrentHashSet();
+
+ /**
+ * Annotation for the max cron batch size.
+ */
+ @VisibleForTesting
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ @interface CronMaxBatchSize { }
+
+ static class CronBatchWorker extends BatchWorker<NoResult> {
+ @Inject
+ CronBatchWorker(
+ Storage storage,
+ StatsProvider statsProvider,
+ @CronMaxBatchSize int maxBatchSize) {
+
+ super(storage, statsProvider, maxBatchSize);
+ }
+
+ @Override
+ protected String serviceName() {
+ return "CronBatchWorker";
+ }
+ }
@Inject
AuroraCronJob(
ConfigurationManager configurationManager,
Config config,
- Storage storage,
- StateManager stateManager) {
+ StateManager stateManager,
+ CronBatchWorker batchWorker) {
this.configurationManager = requireNonNull(configurationManager);
- this.storage = requireNonNull(storage);
this.stateManager = requireNonNull(stateManager);
+ this.batchWorker = requireNonNull(batchWorker);
this.delayedStartBackoff = requireNonNull(config.getDelayedStartBackoff());
}
- private static final class DeferredLaunch {
- private final ITaskConfig task;
- private final Set<Integer> instanceIds;
- private final Set<String> activeTaskIds;
-
- DeferredLaunch(ITaskConfig task, Set<Integer> instanceIds, Set<String> activeTaskIds) {
- this.task = task;
- this.instanceIds = instanceIds;
- this.activeTaskIds = activeTaskIds;
- }
- }
-
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// We assume quartz prevents concurrent runs of this job for a given job key. This allows us
// to avoid races where we might kill another run's tasks.
checkState(context.getJobDetail().isConcurrentExectionDisallowed());
- doExecute(Quartz.auroraJobKey(context.getJobDetail().getKey()));
+ doExecute(context);
}
@VisibleForTesting
- void doExecute(final IJobKey key) throws JobExecutionException {
+ void doExecute(JobExecutionContext context) throws JobExecutionException {
+ final IJobKey key = Quartz.auroraJobKey(context.getJobDetail().getKey());
final String path = JobKeys.canonicalString(key);
- final Optional<DeferredLaunch> deferredLaunch = storage.write(
- (MutateWork.Quiet<Optional<DeferredLaunch>>) storeProvider -> {
- Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key);
- if (!config.isPresent()) {
- LOG.warn(
- "Cron was triggered for {} but no job with that key was found in storage.",
- path);
- CRON_JOB_MISFIRES.incrementAndGet();
- return Optional.absent();
- }
-
- SanitizedCronJob cronJob;
- try {
- cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get());
- } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
- LOG.warn(
- "Invalid cron job for {} in storage - failed to parse with {}", key, e);
- CRON_JOB_PARSE_FAILURES.incrementAndGet();
- return Optional.absent();
- }
-
- CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
- LOG.info(
- "Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy);
- CRON_JOB_TRIGGERS.incrementAndGet();
-
- final Query.Builder activeQuery = Query.jobScoped(key).active();
- Set<String> activeTasks =
- Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
+ // Prevent a concurrent run for this job in case a previous trigger took longer to run.
+ // This approach relies on saving the "work in progress" token within the job context itself
+ // (see below) and relying on killFollowups to signal "work completion".
+ if (context.getJobDetail().getJobDataMap().containsKey(path)) {
+ CRON_JOB_CONCURRENT_RUNS.incrementAndGet();
+ if (killFollowups.contains(key)) {
+ context.getJobDetail().getJobDataMap().remove(path);
+ killFollowups.remove(key);
+ LOG.info("Resetting job context for cron " + path);
+ } else {
+ LOG.info("Ignoring trigger as another concurrent run is active for cron " + path);
+ return;
+ }
+ }
- ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
- Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
- if (activeTasks.isEmpty()) {
- stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+ CompletableFuture<NoResult> scheduleResult = batchWorker.<NoResult>execute(storeProvider -> {
+ Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key);
+ if (!config.isPresent()) {
+ LOG.warn("Cron was triggered for {} but no job with that key was found in storage.", path);
+ CRON_JOB_MISFIRES.incrementAndGet();
+ return BatchWorker.NO_RESULT;
+ }
- return Optional.absent();
- }
+ SanitizedCronJob cronJob;
+ try {
+ cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get());
+ } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
+ LOG.warn("Invalid cron job for {} in storage - failed to parse with {}", key, e);
+ CRON_JOB_PARSE_FAILURES.incrementAndGet();
+ return BatchWorker.NO_RESULT;
+ }
- CRON_JOB_COLLISIONS.incrementAndGet();
- switch (collisionPolicy) {
- case KILL_EXISTING:
- return Optional.of(new DeferredLaunch(task, instanceIds, activeTasks));
+ CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
+ LOG.info("Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy);
+ CRON_JOB_TRIGGERS.incrementAndGet();
- case RUN_OVERLAP:
- LOG.error("Ignoring trigger for job {} with deprecated collision"
- + "policy RUN_OVERLAP due to unterminated active tasks.", path);
- return Optional.absent();
+ final Query.Builder activeQuery = Query.jobScoped(key).active();
+ Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
- case CANCEL_NEW:
- return Optional.absent();
+ ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
+ Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
+ if (activeTasks.isEmpty()) {
+ stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+ return BatchWorker.NO_RESULT;
+ }
- default:
- LOG.error("Unrecognized cron collision policy: " + collisionPolicy);
- return Optional.absent();
+ CRON_JOB_COLLISIONS.incrementAndGet();
+ switch (collisionPolicy) {
+ case KILL_EXISTING:
+ for (String taskId : activeTasks) {
+ stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.absent(),
+ KILLING,
+ KILL_AUDIT_MESSAGE);
}
- }
- );
- if (!deferredLaunch.isPresent()) {
- return;
- }
-
- storage.write((NoResult.Quiet) storeProvider -> {
- for (String taskId : deferredLaunch.get().activeTaskIds) {
- stateManager.changeState(
- storeProvider,
- taskId,
- Optional.absent(),
- KILLING,
- KILL_AUDIT_MESSAGE);
+ LOG.info("Waiting for job to terminate before launching cron job " + path);
+ // Use job detail map to signal a "work in progress" condition to subsequent triggers.
+ context.getJobDetail().getJobDataMap().put(path, null);
+ batchWorker.executeWithReplay(
+ delayedStartBackoff.getBackoffStrategy(),
+ store -> {
+ Query.Builder query = Query.taskScoped(activeTasks).active();
+ if (Iterables.isEmpty(storeProvider.getTaskStore().fetchTasks(query))) {
+ LOG.info("Initiating delayed launch of cron " + path);
+ stateManager.insertPendingTasks(store, task, instanceIds);
+ return new BatchWorker.Result<>(true, null);
+ } else {
+ LOG.info("Not yet safe to run cron " + path);
+ return new BatchWorker.Result<>(false, null);
+ }
+ })
+ .thenAccept(ignored -> {
+ killFollowups.add(key);
+ LOG.info("Finished delayed launch for cron " + path);
+ });
+ break;
+
+ case RUN_OVERLAP:
+ LOG.error("Ignoring trigger for job {} with deprecated collision"
+ + "policy RUN_OVERLAP due to unterminated active tasks.", path);
+ break;
+
+ case CANCEL_NEW:
+ break;
+
+ default:
+ LOG.error("Unrecognized cron collision policy: " + collisionPolicy);
}
+ return BatchWorker.NO_RESULT;
});
- LOG.info("Waiting for job to terminate before launching cron job {}.", path);
-
- final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active();
try {
- // NOTE: We block the quartz execution thread here until we've successfully killed our
- // ancestor. We mitigate this by using a cached thread pool for quartz.
- delayedStartBackoff.doUntilSuccess(() -> {
- if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) {
- LOG.info("Initiating delayed launch of cron " + path);
- storage.write((NoResult.Quiet) storeProvider -> stateManager.insertPendingTasks(
- storeProvider,
- deferredLaunch.get().task,
- deferredLaunch.get().instanceIds));
-
- return true;
- } else {
- LOG.info("Not yet safe to run cron " + path);
- return false;
- }
- });
- } catch (InterruptedException e) {
+ scheduleResult.get();
+ } catch (ExecutionException | InterruptedException e) {
LOG.warn("Interrupted while trying to launch cron " + path, e);
Thread.currentThread().interrupt();
throw new JobExecutionException(e);
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
index 155d702..9c88a2a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
@@ -21,16 +21,19 @@ import javax.inject.Singleton;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
import org.apache.aurora.common.args.Arg;
import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.common.args.constraints.Positive;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CronPredictor;
import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;
@@ -38,6 +41,7 @@ import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_ID;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_NAME;
@@ -55,7 +59,7 @@ public class CronModule extends AbstractModule {
@CmdLine(name = "cron_scheduler_num_threads",
help = "Number of threads to use for the cron scheduler thread pool.")
- private static final Arg<Integer> NUM_THREADS = Arg.create(100);
+ private static final Arg<Integer> NUM_THREADS = Arg.create(10);
@CmdLine(name = "cron_timezone", help = "TimeZone to use for cron predictions.")
private static final Arg<String> CRON_TIMEZONE = Arg.create("GMT");
@@ -63,13 +67,18 @@ public class CronModule extends AbstractModule {
@CmdLine(name = "cron_start_initial_backoff", help =
"Initial backoff delay while waiting for a previous cron run to be killed.")
public static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
- Arg.create(Amount.of(1L, Time.SECONDS));
+ Arg.create(Amount.of(5L, Time.SECONDS));
@CmdLine(name = "cron_start_max_backoff", help =
"Max backoff delay while waiting for a previous cron run to be killed.")
public static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
Arg.create(Amount.of(1L, Time.MINUTES));
+ @Positive
+ @CmdLine(name = "cron_scheduling_max_batch_size",
+ help = "The maximum number of triggered cron jobs that can be processed in a batch.")
+ private static final Arg<Integer> CRON_MAX_BATCH_SIZE = Arg.create(10);
+
// Global per-JVM ID number generator for the provided Quartz Scheduler.
private static final AtomicLong ID_GENERATOR = new AtomicLong();
@@ -90,8 +99,16 @@ public class CronModule extends AbstractModule {
bind(AuroraCronJob.Config.class).toInstance(new AuroraCronJob.Config(
new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get())));
+ PubsubEventModule.bindSubscriber(binder(), AuroraCronJob.class);
+
bind(CronLifecycle.class).in(Singleton.class);
- SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+ addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+
+ bind(new TypeLiteral<Integer>() { })
+ .annotatedWith(AuroraCronJob.CronMaxBatchSize.class)
+ .toInstance(CRON_MAX_BATCH_SIZE.get());
+ bind(CronBatchWorker.class).in(Singleton.class);
+ addSchedulerActiveServiceBinding(binder()).to(CronBatchWorker.class);
}
@Provides
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index 5c64ff2..fb06c28 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -13,17 +13,23 @@
*/
package org.apache.aurora.scheduler.cron.quartz;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
-import org.apache.aurora.common.base.ExceptionalSupplier;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.BackoffHelper;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.CronCollisionPolicy;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.RepeatableWork;
+import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
@@ -31,11 +37,17 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.easymock.Capture;
-import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.quartz.impl.JobDetailImpl;
+import static org.apache.aurora.scheduler.cron.quartz.QuartzTestUtil.AURORA_JOB_KEY;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
@@ -43,50 +55,61 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AuroraCronJobTest extends EasyMockTest {
- public static final String TASK_ID = "A";
+ private static final String TASK_ID = "A";
+ private JobDetailImpl jobDetails;
private Storage storage;
private StateManager stateManager;
private BackoffHelper backoffHelper;
-
+ private CronBatchWorker batchWorker;
+ private JobExecutionContext context;
private AuroraCronJob auroraCronJob;
@Before
- public void setUp() {
+ public void setUp() throws Exception {
storage = DbUtil.createStorage();
stateManager = createMock(StateManager.class);
backoffHelper = createMock(BackoffHelper.class);
+ context = createMock(JobExecutionContext.class);
+
+ jobDetails = new JobDetailImpl();
+ jobDetails.setKey(Quartz.jobKey(AURORA_JOB_KEY));
+ jobDetails.setJobDataMap(new JobDataMap(new HashMap()));
+ expect(context.getJobDetail()).andReturn(jobDetails).anyTimes();
+
+ batchWorker = createMock(CronBatchWorker.class);
+ expectBatchExecute(batchWorker, storage, control).anyTimes();
auroraCronJob = new AuroraCronJob(
TaskTestUtil.CONFIGURATION_MANAGER,
- new AuroraCronJob.Config(backoffHelper), storage, stateManager);
+ new AuroraCronJob.Config(backoffHelper),
+ stateManager,
+ batchWorker);
}
@Test
public void testExecuteNonexistentIsNoop() throws JobExecutionException {
control.replay();
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ auroraCronJob.doExecute(context);
}
@Test
public void testEmptyStorage() throws JobExecutionException {
- stateManager.insertPendingTasks(
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject());
+ stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject());
expectLastCall().times(3);
control.replay();
+
populateStorage(CronCollisionPolicy.CANCEL_NEW);
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
- storage = DbUtil.createStorage();
+ auroraCronJob.doExecute(context);
- populateStorage(CronCollisionPolicy.KILL_EXISTING);
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
storage = DbUtil.createStorage();
+ populateStorage(CronCollisionPolicy.KILL_EXISTING);
+ auroraCronJob.doExecute(context);
+ storage = DbUtil.createStorage();
populateStorage(CronCollisionPolicy.RUN_OVERLAP);
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ auroraCronJob.doExecute(context);
}
@Test
@@ -95,35 +118,65 @@ public class AuroraCronJobTest extends EasyMockTest {
populateTaskStore();
populateStorage(CronCollisionPolicy.CANCEL_NEW);
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ auroraCronJob.doExecute(context);
+ }
+
+ @Test
+ public void testOverlap() throws JobExecutionException {
+ control.replay();
+
+ populateTaskStore();
+ populateStorage(CronCollisionPolicy.RUN_OVERLAP);
+ auroraCronJob.doExecute(context);
}
@Test
public void testKillExisting() throws Exception {
- Capture<ExceptionalSupplier<Boolean, RuntimeException>> capture = createCapture();
+ Capture<RepeatableWork<BatchWorker.NoResult>> killCapture = createCapture();
+ CompletableFuture<BatchWorker.NoResult> killResult = new CompletableFuture<>();
+ expect(batchWorker.executeWithReplay(anyObject(), capture(killCapture))).andReturn(killResult);
+ expect(backoffHelper.getBackoffStrategy()).andReturn(null).anyTimes();
expect(stateManager.changeState(
- EasyMock.anyObject(),
+ anyObject(),
eq(TASK_ID),
eq(Optional.absent()),
eq(ScheduleStatus.KILLING),
eq(AuroraCronJob.KILL_AUDIT_MESSAGE)))
.andReturn(StateChangeResult.SUCCESS);
- backoffHelper.doUntilSuccess(EasyMock.capture(capture));
- stateManager.insertPendingTasks(
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject());
+ stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject());
+ expectLastCall().times(2);
control.replay();
populateStorage(CronCollisionPolicy.KILL_EXISTING);
populateTaskStore();
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
- assertFalse(capture.getValue().get());
+ auroraCronJob.doExecute(context);
+
storage.write(
(NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().deleteAllTasks());
- assertTrue(capture.getValue().get());
+ storage.write((NoResult.Quiet) store -> killCapture.getValue().apply(store));
+
+ // Simulate a trigger in progress.
+ jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null);
+ assertFalse(jobDetails.getJobDataMap().isEmpty());
+
+ // Attempt a concurrent run that must be rejected.
+ auroraCronJob.doExecute(context);
+
+ // Complete previous run and trigger another one.
+ killResult.complete(BatchWorker.NO_RESULT);
+ auroraCronJob.doExecute(context);
+ assertTrue(jobDetails.getJobDataMap().isEmpty());
+ }
+
+ @Test
+ public void testNoConcurrentRun() throws Exception {
+ jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null);
+
+ control.replay();
+
+ auroraCronJob.doExecute(context);
}
private void populateTaskStore() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index 1c0a3fa..8556253 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -21,6 +21,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.util.Modules;
+import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.Container;
@@ -34,6 +35,7 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CrontabEntry;
import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -95,6 +97,8 @@ public class CronIT extends EasyMockTest {
bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
bind(StateManager.class).toInstance(stateManager);
bind(Storage.class).toInstance(storage);
+ bind(StatsProvider.class).toInstance(createMock(StatsProvider.class));
+ bind(EventSink.class).toInstance(createMock(EventSink.class));
}
});
}