You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/09/16 21:45:18 UTC

[2/3] aurora git commit: Batching writes - Part 2 (of 3): Converting cron jobs to use BatchWorker.

Batching writes - Part 2 (of 3): Converting cron jobs to use BatchWorker.

Reviewed at https://reviews.apache.org/r/51763/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2cb43d61
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2cb43d61
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2cb43d61

Branch: refs/heads/master
Commit: 2cb43d61ecafb79b31d36332ef4713b9857b3c1a
Parents: ebfeb3e
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:26 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:26 2016 -0700

----------------------------------------------------------------------
 .../aurora/common/util/BackoffHelper.java       |   8 +
 .../aurora/common/util/BackoffHelperTest.java   |   7 +
 .../scheduler/cron/quartz/AuroraCronJob.java    | 239 +++++++++++--------
 .../scheduler/cron/quartz/CronModule.java       |  25 +-
 .../cron/quartz/AuroraCronJobTest.java          | 107 ++++++---
 .../aurora/scheduler/cron/quartz/CronIT.java    |   4 +
 6 files changed, 256 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
index 8e73dd9..517c0ef 100644
--- a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
+++ b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
@@ -90,6 +90,14 @@ public class BackoffHelper {
   }
 
   /**
+   * Gets {@link BackoffStrategy} instance the BackoffHelper is initialized with.
+   * @return instance of {@link BackoffStrategy} used by BackoffHelper.
+   */
+  public BackoffStrategy getBackoffStrategy() {
+    return backoffStrategy;
+  }
+
+  /**
    * Executes the given task using the configured backoff strategy until the task succeeds as
    * indicated by returning a non-null value.
    *

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
index bc30990..012fbac 100644
--- a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
@@ -41,6 +41,13 @@ public class BackoffHelperTest extends EasyMockTest {
   }
 
   @Test
+  public void testGetBackoffStrategy() {
+    control.replay();
+
+    assertEquals(backoffStrategy, backoffHelper.getBackoffStrategy());
+  }
+
+  @Test
   public void testDoUntilSuccess() throws Exception {
     ExceptionalSupplier<Boolean, RuntimeException> task =
         createMock(new Clazz<ExceptionalSupplier<Boolean, RuntimeException>>() { });

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index c07551e..7c8047a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -13,29 +13,37 @@
  */
 package org.apache.aurora.scheduler.cron.quartz;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 import java.util.Date;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.BackoffHelper;
 import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.NoResult;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -43,9 +51,14 @@ import org.quartz.DisallowConcurrentExecution;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -61,7 +74,8 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING;
  * scheduler should therefore be configured with a large number of threads.
  */
 @DisallowConcurrentExecution
-class AuroraCronJob implements Job {
+@PersistJobDataAfterExecution
+class AuroraCronJob implements Job, EventSubscriber {
   private static final Logger LOG = LoggerFactory.getLogger(AuroraCronJob.class);
 
   private static final AtomicLong CRON_JOB_TRIGGERS = Stats.exportLong("cron_job_triggers");
@@ -69,147 +83,166 @@ class AuroraCronJob implements Job {
   private static final AtomicLong CRON_JOB_PARSE_FAILURES =
       Stats.exportLong("cron_job_parse_failures");
   private static final AtomicLong CRON_JOB_COLLISIONS = Stats.exportLong("cron_job_collisions");
+  private static final AtomicLong CRON_JOB_CONCURRENT_RUNS =
+      Stats.exportLong("cron_job_concurrent_runs");
 
   @VisibleForTesting
   static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cronScheduler");
 
   private final ConfigurationManager configurationManager;
-  private final Storage storage;
   private final StateManager stateManager;
   private final BackoffHelper delayedStartBackoff;
+  private final BatchWorker<NoResult> batchWorker;
+  private final Set<IJobKey> killFollowups = Sets.newConcurrentHashSet();
+
+  /**
+   * Annotation for the max cron batch size.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface CronMaxBatchSize { }
+
+  static class CronBatchWorker extends BatchWorker<NoResult> {
+    @Inject
+    CronBatchWorker(
+        Storage storage,
+        StatsProvider statsProvider,
+        @CronMaxBatchSize int maxBatchSize) {
+
+      super(storage, statsProvider, maxBatchSize);
+    }
+
+    @Override
+    protected String serviceName() {
+      return "CronBatchWorker";
+    }
+  }
 
   @Inject
   AuroraCronJob(
       ConfigurationManager configurationManager,
       Config config,
-      Storage storage,
-      StateManager stateManager) {
+      StateManager stateManager,
+      CronBatchWorker batchWorker) {
 
     this.configurationManager = requireNonNull(configurationManager);
-    this.storage = requireNonNull(storage);
     this.stateManager = requireNonNull(stateManager);
+    this.batchWorker = requireNonNull(batchWorker);
     this.delayedStartBackoff = requireNonNull(config.getDelayedStartBackoff());
   }
 
-  private static final class DeferredLaunch {
-    private final ITaskConfig task;
-    private final Set<Integer> instanceIds;
-    private final Set<String> activeTaskIds;
-
-    DeferredLaunch(ITaskConfig task, Set<Integer> instanceIds, Set<String> activeTaskIds) {
-      this.task = task;
-      this.instanceIds = instanceIds;
-      this.activeTaskIds = activeTaskIds;
-    }
-  }
-
   @Override
   public void execute(JobExecutionContext context) throws JobExecutionException {
     // We assume quartz prevents concurrent runs of this job for a given job key. This allows us
     // to avoid races where we might kill another run's tasks.
     checkState(context.getJobDetail().isConcurrentExectionDisallowed());
 
-    doExecute(Quartz.auroraJobKey(context.getJobDetail().getKey()));
+    doExecute(context);
   }
 
   @VisibleForTesting
-  void doExecute(final IJobKey key) throws JobExecutionException {
+  void doExecute(JobExecutionContext context) throws JobExecutionException {
+    final IJobKey key = Quartz.auroraJobKey(context.getJobDetail().getKey());
     final String path = JobKeys.canonicalString(key);
 
-    final Optional<DeferredLaunch> deferredLaunch = storage.write(
-        (MutateWork.Quiet<Optional<DeferredLaunch>>) storeProvider -> {
-          Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key);
-          if (!config.isPresent()) {
-            LOG.warn(
-                "Cron was triggered for {} but no job with that key was found in storage.",
-                path);
-            CRON_JOB_MISFIRES.incrementAndGet();
-            return Optional.absent();
-          }
-
-          SanitizedCronJob cronJob;
-          try {
-            cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get());
-          } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
-            LOG.warn(
-                "Invalid cron job for {} in storage - failed to parse with {}", key, e);
-            CRON_JOB_PARSE_FAILURES.incrementAndGet();
-            return Optional.absent();
-          }
-
-          CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
-          LOG.info(
-              "Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy);
-          CRON_JOB_TRIGGERS.incrementAndGet();
-
-          final Query.Builder activeQuery = Query.jobScoped(key).active();
-          Set<String> activeTasks =
-              Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
+    // Prevent a concurrent run for this job in case a previous trigger took longer to run.
+    // This approach relies on saving the "work in progress" token within the job context itself
+    // (see below) and relying on killFollowups to signal "work completion".
+    if (context.getJobDetail().getJobDataMap().containsKey(path)) {
+      CRON_JOB_CONCURRENT_RUNS.incrementAndGet();
+      if (killFollowups.contains(key)) {
+        context.getJobDetail().getJobDataMap().remove(path);
+        killFollowups.remove(key);
+        LOG.info("Resetting job context for cron " + path);
+      } else {
+        LOG.info("Ignoring trigger as another concurrent run is active for cron " + path);
+        return;
+      }
+    }
 
-          ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
-          Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
-          if (activeTasks.isEmpty()) {
-            stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+    CompletableFuture<NoResult> scheduleResult = batchWorker.<NoResult>execute(storeProvider -> {
+      Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key);
+      if (!config.isPresent()) {
+        LOG.warn("Cron was triggered for {} but no job with that key was found in storage.", path);
+        CRON_JOB_MISFIRES.incrementAndGet();
+        return BatchWorker.NO_RESULT;
+      }
 
-            return Optional.absent();
-          }
+      SanitizedCronJob cronJob;
+      try {
+        cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get());
+      } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
+        LOG.warn("Invalid cron job for {} in storage - failed to parse with {}", key, e);
+        CRON_JOB_PARSE_FAILURES.incrementAndGet();
+        return BatchWorker.NO_RESULT;
+      }
 
-          CRON_JOB_COLLISIONS.incrementAndGet();
-          switch (collisionPolicy) {
-            case KILL_EXISTING:
-              return Optional.of(new DeferredLaunch(task, instanceIds, activeTasks));
+      CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
+      LOG.info("Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy);
+      CRON_JOB_TRIGGERS.incrementAndGet();
 
-            case RUN_OVERLAP:
-              LOG.error("Ignoring trigger for job {} with deprecated collision"
-                  + "policy RUN_OVERLAP due to unterminated active tasks.", path);
-              return Optional.absent();
+      final Query.Builder activeQuery = Query.jobScoped(key).active();
+      Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
 
-            case CANCEL_NEW:
-              return Optional.absent();
+      ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
+      Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
+      if (activeTasks.isEmpty()) {
+        stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+        return BatchWorker.NO_RESULT;
+      }
 
-            default:
-              LOG.error("Unrecognized cron collision policy: " + collisionPolicy);
-              return Optional.absent();
+      CRON_JOB_COLLISIONS.incrementAndGet();
+      switch (collisionPolicy) {
+        case KILL_EXISTING:
+          for (String taskId : activeTasks) {
+            stateManager.changeState(
+                storeProvider,
+                taskId,
+                Optional.absent(),
+                KILLING,
+                KILL_AUDIT_MESSAGE);
           }
-        }
-    );
 
-    if (!deferredLaunch.isPresent()) {
-      return;
-    }
-
-    storage.write((NoResult.Quiet) storeProvider -> {
-      for (String taskId : deferredLaunch.get().activeTaskIds) {
-        stateManager.changeState(
-            storeProvider,
-            taskId,
-            Optional.absent(),
-            KILLING,
-            KILL_AUDIT_MESSAGE);
+          LOG.info("Waiting for job to terminate before launching cron job " + path);
+          // Use job detail map to signal a "work in progress" condition to subsequent triggers.
+          context.getJobDetail().getJobDataMap().put(path, null);
+          batchWorker.executeWithReplay(
+              delayedStartBackoff.getBackoffStrategy(),
+              store -> {
+                Query.Builder query = Query.taskScoped(activeTasks).active();
+                if (Iterables.isEmpty(storeProvider.getTaskStore().fetchTasks(query))) {
+                  LOG.info("Initiating delayed launch of cron " + path);
+                  stateManager.insertPendingTasks(store, task, instanceIds);
+                  return new BatchWorker.Result<>(true, null);
+                } else {
+                  LOG.info("Not yet safe to run cron " + path);
+                  return new BatchWorker.Result<>(false, null);
+                }
+              })
+              .thenAccept(ignored -> {
+                killFollowups.add(key);
+                LOG.info("Finished delayed launch for cron " + path);
+              });
+          break;
+
+        case RUN_OVERLAP:
+          LOG.error("Ignoring trigger for job {} with deprecated collision"
+              + "policy RUN_OVERLAP due to unterminated active tasks.", path);
+          break;
+
+        case CANCEL_NEW:
+          break;
+
+        default:
+          LOG.error("Unrecognized cron collision policy: " + collisionPolicy);
       }
+      return BatchWorker.NO_RESULT;
     });
 
-    LOG.info("Waiting for job to terminate before launching cron job {}.", path);
-
-    final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active();
     try {
-      // NOTE: We block the quartz execution thread here until we've successfully killed our
-      // ancestor. We mitigate this by using a cached thread pool for quartz.
-      delayedStartBackoff.doUntilSuccess(() -> {
-        if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) {
-          LOG.info("Initiating delayed launch of cron " + path);
-          storage.write((NoResult.Quiet) storeProvider -> stateManager.insertPendingTasks(
-              storeProvider,
-              deferredLaunch.get().task,
-              deferredLaunch.get().instanceIds));
-
-          return true;
-        } else {
-          LOG.info("Not yet safe to run cron " + path);
-          return false;
-        }
-      });
-    } catch (InterruptedException e) {
+      scheduleResult.get();
+    } catch (ExecutionException | InterruptedException e) {
       LOG.warn("Interrupted while trying to launch cron " + path, e);
       Thread.currentThread().interrupt();
       throw new JobExecutionException(e);

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
index 155d702..9c88a2a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
@@ -21,16 +21,19 @@ import javax.inject.Singleton;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.common.args.Arg;
 import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.common.args.constraints.Positive;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
 import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.impl.StdSchedulerFactory;
@@ -38,6 +41,7 @@ import org.quartz.simpl.SimpleThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding;
 import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_ID;
 import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON;
 import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_NAME;
@@ -55,7 +59,7 @@ public class CronModule extends AbstractModule {
 
   @CmdLine(name = "cron_scheduler_num_threads",
       help = "Number of threads to use for the cron scheduler thread pool.")
-  private static final Arg<Integer> NUM_THREADS = Arg.create(100);
+  private static final Arg<Integer> NUM_THREADS = Arg.create(10);
 
   @CmdLine(name = "cron_timezone", help = "TimeZone to use for cron predictions.")
   private static final Arg<String> CRON_TIMEZONE = Arg.create("GMT");
@@ -63,13 +67,18 @@ public class CronModule extends AbstractModule {
   @CmdLine(name = "cron_start_initial_backoff", help =
       "Initial backoff delay while waiting for a previous cron run to be killed.")
   public static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
-      Arg.create(Amount.of(1L, Time.SECONDS));
+      Arg.create(Amount.of(5L, Time.SECONDS));
 
   @CmdLine(name = "cron_start_max_backoff", help =
       "Max backoff delay while waiting for a previous cron run to be killed.")
   public static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
       Arg.create(Amount.of(1L, Time.MINUTES));
 
+  @Positive
+  @CmdLine(name = "cron_scheduling_max_batch_size",
+      help = "The maximum number of triggered cron jobs that can be processed in a batch.")
+  private static final Arg<Integer> CRON_MAX_BATCH_SIZE = Arg.create(10);
+
   // Global per-JVM ID number generator for the provided Quartz Scheduler.
   private static final AtomicLong ID_GENERATOR = new AtomicLong();
 
@@ -90,8 +99,16 @@ public class CronModule extends AbstractModule {
     bind(AuroraCronJob.Config.class).toInstance(new AuroraCronJob.Config(
         new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get())));
 
+    PubsubEventModule.bindSubscriber(binder(), AuroraCronJob.class);
+
     bind(CronLifecycle.class).in(Singleton.class);
-    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+    addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+
+    bind(new TypeLiteral<Integer>() { })
+        .annotatedWith(AuroraCronJob.CronMaxBatchSize.class)
+        .toInstance(CRON_MAX_BATCH_SIZE.get());
+    bind(CronBatchWorker.class).in(Singleton.class);
+    addSchedulerActiveServiceBinding(binder()).to(CronBatchWorker.class);
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index 5c64ff2..fb06c28 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -13,17 +13,23 @@
  */
 package org.apache.aurora.scheduler.cron.quartz;
 
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
-import org.apache.aurora.common.base.ExceptionalSupplier;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.BackoffHelper;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.CronCollisionPolicy;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.RepeatableWork;
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -31,11 +37,17 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.easymock.Capture;
-import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.impl.JobDetailImpl;
 
+import static org.apache.aurora.scheduler.cron.quartz.QuartzTestUtil.AURORA_JOB_KEY;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
@@ -43,50 +55,61 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AuroraCronJobTest extends EasyMockTest {
-  public static final String TASK_ID = "A";
+  private static final String TASK_ID = "A";
+  private JobDetailImpl jobDetails;
   private Storage storage;
   private StateManager stateManager;
   private BackoffHelper backoffHelper;
-
+  private CronBatchWorker batchWorker;
+  private JobExecutionContext context;
   private AuroraCronJob auroraCronJob;
 
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     storage = DbUtil.createStorage();
     stateManager = createMock(StateManager.class);
     backoffHelper = createMock(BackoffHelper.class);
+    context = createMock(JobExecutionContext.class);
+
+    jobDetails = new JobDetailImpl();
+    jobDetails.setKey(Quartz.jobKey(AURORA_JOB_KEY));
+    jobDetails.setJobDataMap(new JobDataMap(new HashMap()));
+    expect(context.getJobDetail()).andReturn(jobDetails).anyTimes();
+
+    batchWorker = createMock(CronBatchWorker.class);
+    expectBatchExecute(batchWorker, storage, control).anyTimes();
 
     auroraCronJob = new AuroraCronJob(
         TaskTestUtil.CONFIGURATION_MANAGER,
-        new AuroraCronJob.Config(backoffHelper), storage, stateManager);
+        new AuroraCronJob.Config(backoffHelper),
+        stateManager,
+        batchWorker);
   }
 
   @Test
   public void testExecuteNonexistentIsNoop() throws JobExecutionException {
     control.replay();
 
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    auroraCronJob.doExecute(context);
   }
 
   @Test
   public void testEmptyStorage() throws JobExecutionException {
-    stateManager.insertPendingTasks(
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject());
+    stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject());
     expectLastCall().times(3);
 
     control.replay();
+
     populateStorage(CronCollisionPolicy.CANCEL_NEW);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    storage = DbUtil.createStorage();
+    auroraCronJob.doExecute(context);
 
-    populateStorage(CronCollisionPolicy.KILL_EXISTING);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
     storage = DbUtil.createStorage();
+    populateStorage(CronCollisionPolicy.KILL_EXISTING);
+    auroraCronJob.doExecute(context);
 
+    storage = DbUtil.createStorage();
     populateStorage(CronCollisionPolicy.RUN_OVERLAP);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    auroraCronJob.doExecute(context);
   }
 
   @Test
@@ -95,35 +118,65 @@ public class AuroraCronJobTest extends EasyMockTest {
 
     populateTaskStore();
     populateStorage(CronCollisionPolicy.CANCEL_NEW);
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+    auroraCronJob.doExecute(context);
+  }
+
+  @Test
+  public void testOverlap() throws JobExecutionException {
+    control.replay();
+
+    populateTaskStore();
+    populateStorage(CronCollisionPolicy.RUN_OVERLAP);
+    auroraCronJob.doExecute(context);
   }
 
   @Test
   public void testKillExisting() throws Exception {
-    Capture<ExceptionalSupplier<Boolean, RuntimeException>> capture = createCapture();
+    Capture<RepeatableWork<BatchWorker.NoResult>> killCapture = createCapture();
+    CompletableFuture<BatchWorker.NoResult> killResult = new CompletableFuture<>();
+    expect(batchWorker.executeWithReplay(anyObject(), capture(killCapture))).andReturn(killResult);
 
+    expect(backoffHelper.getBackoffStrategy()).andReturn(null).anyTimes();
     expect(stateManager.changeState(
-        EasyMock.anyObject(),
+        anyObject(),
         eq(TASK_ID),
         eq(Optional.absent()),
         eq(ScheduleStatus.KILLING),
         eq(AuroraCronJob.KILL_AUDIT_MESSAGE)))
         .andReturn(StateChangeResult.SUCCESS);
-    backoffHelper.doUntilSuccess(EasyMock.capture(capture));
-    stateManager.insertPendingTasks(
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject());
+    stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject());
+    expectLastCall().times(2);
 
     control.replay();
 
     populateStorage(CronCollisionPolicy.KILL_EXISTING);
     populateTaskStore();
-    auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
-    assertFalse(capture.getValue().get());
+    auroraCronJob.doExecute(context);
+
     storage.write(
         (NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().deleteAllTasks());
-    assertTrue(capture.getValue().get());
+    storage.write((NoResult.Quiet) store -> killCapture.getValue().apply(store));
+
+    // Simulate a trigger in progress.
+    jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null);
+    assertFalse(jobDetails.getJobDataMap().isEmpty());
+
+    // Attempt a concurrent run that must be rejected.
+    auroraCronJob.doExecute(context);
+
+    // Complete previous run and trigger another one.
+    killResult.complete(BatchWorker.NO_RESULT);
+    auroraCronJob.doExecute(context);
+    assertTrue(jobDetails.getJobDataMap().isEmpty());
+  }
+
+  @Test
+  public void testNoConcurrentRun() throws Exception {
+    jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null);
+
+    control.replay();
+
+    auroraCronJob.doExecute(context);
   }
 
   private void populateTaskStore() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index 1c0a3fa..8556253 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -21,6 +21,7 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.util.Modules;
 
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.Container;
@@ -34,6 +35,7 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CrontabEntry;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -95,6 +97,8 @@ public class CronIT extends EasyMockTest {
             bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
             bind(StateManager.class).toInstance(stateManager);
             bind(Storage.class).toInstance(storage);
+            bind(StatsProvider.class).toInstance(createMock(StatsProvider.class));
+            bind(EventSink.class).toInstance(createMock(EventSink.class));
           }
         });
   }